1 Azkaban概述
-
为什么需要工作流调度系统?
1)一个完整的数据分析系统通常都是由大量任务单元组成:Shell脚本程序,Java程序,MapReduce程序、Hive脚本等
2)各任务单元之间存在时间先后及前后依赖关系
3)为了很好地组织起这样的复杂执行计划,需要一个工作流调度系统来调度执行;
4)大数据场景下每个任务执行时间较长,且通常需要每天都执行这些任务,人工手动等待每个任务执行完毕再执行下一个任务显然是不可行的。
-
Azkaban是一个较轻量级的的任务调度工作,可以根据每个任务单元的依赖关系,帮我们自动执行工作流,且可以定时调度工作流程。
-
常见的工作流调度系统:
1)简单的任务调度(如数据库的定时调度任务,不存在依赖关系):直接使用Linux的Crontab来定义;
2)复杂的任务调度(复杂的工作流程,存在依赖关系):开发调度平台或使用现成的开源调度系统,比如Ooize(Oozie是CDH平台下的工作流调度系统,配合CDH下的HUE进行可视化操作会比较方便,脱离了HUE会比较复杂难操作)、Azkaban(简单易用,通过yaml文件和页面进行工作流调度)、 Airflow(使用python进行工作流调度,需要python基础)、DolphinScheduler(可视化页面进行配置,也比较方便)等。
-
Azkaban的架构
Azkaban Web Server:主要的功能有用户管理和权限管理、任务的定时和触发
Azkaban Executor Server:负责任务的执行
Mysql:存储工作流的配置、定时规则、任务的执行状态 -
Azkaban有两种部署模式,一种是单机模式(测试使用,一个进程,包含了Web Server和Executor Server),一种是集群模式(生产环境使用,Web Server和Executor Server是两个进程,可以部署多Executor Server,用于容灾和负载均衡)
2 Azkaban集群模式安装
-
集群规划,在hadoop102上部署Web Server,在hadoop102、hadoop103、hadoop104部署Executor Server
-
安装步骤
(1)将azkaban-db-3.84.4.tar.gz(跟mysql相关的建表语句),azkaban-exec-server-3.84.4.tar.gz(executor server的安装包),azkaban-web-server-3.84.4.tar.gz(web server的安装包)上传到hadoop102的/opt/software路径
(2)解压三个安装包到azkaban文件夹下,并更名mv azkaban-exec-server-3.84.4/ azkaban-exec mv azkaban-web-server-3.84.4/ azkaban-web
(3)启动mysql,创建azkaban数据库
mysql> create database azkaban;
(4)创建azkaban用户并赋予权限
# 设置密码有效长度4位及以上 install plugin validate_password soname 'validate_password.so'; mysql> set global validate_password_length=4; # 设置密码策略最低级别 mysql> set global validate_password_policy=0; # 创建Azkaban用户,任何主机都可以访问Azkaban,密码是000000 mysql> CREATE USER 'azkaban'@'%' IDENTIFIED BY '000000'; # 赋予Azkaban用户增删改查权限 mysql> GRANT SELECT,INSERT,UPDATE,DELETE ON azkaban.* to 'azkaban'@'%' WITH GRANT OPTION; # 创建Azkaban表,完成后退出MySQL mysql> use azkaban; mysql> source /opt/module/azkaban/azkaban-db-3.84.4/create-all-sql-3.84.4.sql mysql> quit;
(5)更改MySQL包大小;防止Azkaban连接MySQL阻塞
sudo vim /etc/my.cnf [mysqld] max_allowed_packet=1024M
(6)重启mysql
sudo systemctl restart mysqld
(7)配置Executor Server:编辑azkaban-exec/conf/azkaban.properties,修改如下信息
# 时区 default.timezone.id=Asia/Shanghai # 指明WebServer所在的主机名 azkaban.webserver.url=http://hadoop102:8081 # 指明访问executor的端口号 executor.port=12321 # 数据库相关参数 database.type=mysql mysql.port=3306 mysql.host=hadoop102 mysql.database=azkaban mysql.user=azkaban mysql.password=azkaban mysql.numconnections=100
(8)配置Executor Server:分发azkaban-exec到hadoop103、hadoop104
(9)配置Executor Server:必须进入到/opt/module/azkaban/azkaban-exec路径(因为配置文件中配置了一些相对路径,这个相对路径是基于启动路径的),分别在三台机器上,启动并激活executor server,如果在/opt/module/azkaban/azkaban-exec目录下出现executor.port文件,说明启动成功,也可以在mysql中的executors表中查看,有相应的主机名和激活状态
[atguigu@hadoop102 azkaban-exec]$ bin/start-exec.sh [atguigu@hadoop103 azkaban-exec]$ bin/start-exec.sh [atguigu@hadoop104 azkaban-exec]$ bin/start-exec.sh [atguigu@hadoop102 azkaban-exec]$ curl -G "hadoop102:12321/executor?action=activate" && echo [atguigu@hadoop103 azkaban-exec]$ curl -G "hadoop103:12321/executor?action=activate" && echo [atguigu@hadoop104 azkaban-exec]$ curl -G "hadoop104:12321/executor?action=activate" && echo
(10)配置Web Server:编辑azkaban-web/conf/azkaban.properties, 修改如下信息
... # 时区 default.timezone.id=Asia/Shanghai ... # 数据库相关信息 database.type=mysql mysql.port=3306 mysql.host=hadoop102 mysql.database=azkaban mysql.user=azkaban mysql.password=azkaban mysql.numconnections=100 ... # StaticRemainingFlowSize:根据各Executor正在排队的任务数选择执行任务的Executor; # MinimumFreeMemory:最小空余内存,默认值是6G,空余内存小于6g的Executor被排除。测试环境,必须将MinimumFreeMemory删除掉,否则它会认为集群资源不够,因为每台机器内存都小于6G,不执行。 # CpuStatus:根据各ExecutorCPU占用情况选择执行任务的Executor; azkaban.executorselector.filters=StaticRemainingFlowSize,CpuStatus
(11)配置Web Server:修改azkaban-web/conf/azkaban-users.xml文件,添加F用户
<azkaban-users> <!-- 用户 --> <user groups="azkaban" password="azkaban" roles="admin" username="azkaban"/> <user password="metrics" roles="metrics" username="metrics"/> <user password="123456" roles="admin" username="F"/> <!-- 角色 --> <role name="admin" permissions="ADMIN"/> <role name="metrics" permissions="METRICS"/> </azkaban-users>
(12)配置Web Server:必须进入到hadoop102的/opt/module/azkaban/azkaban-web路径,启动web server
[atguigu@hadoop102 azkaban-web]$ bin/start-web.sh
(13)访问http://hadoop102:8081,并用F用户登陆
3 Azkaban使用
-
Azkaban使用最重要的是两个文件的编写,一个是azkaban.project,里面的内容是固定的,指明描述工作流的语法版本是2.0(yaml语法),1.0的语法是properties语法,也即键值对形式
azkaban-flow-version: 2.0
另外一个是.flow文件,用于描述工作流,2.0使用的是yaml语法,里面的内容是一个nodes数组,nodes数组的每一个元素代表了一个工作单元job
3.1 HelloWorld案例
-
HelloWorld案例
(1)在windows环境,新建azkaban.project文件,编辑内容如下azkaban-flow-version: 2.0
(2)新建basic.flow文件,内容如下,其中name为job名称、type为job类型(command表示执行作业的方式为命令)、config是job的配置
nodes: - name: jobA type: command config: command: echo "Hello World"
(3)将azkaban.project、basic.flow文件压缩到一个zip文件,文件名称必须是英文。
(4)在webserver页面创建项目,给项目命名和添加项目描述
(5)上传压缩文件
(6)执行任务流,执行产生的图会实时显示执行状况,绿色代表执行成功,红色代表执行失败,蓝色代表执行中
(7)执行结束后可以在Flow Log中查看工作流执行情况,在Job List中查看每个工作单元的执行情况
3.2 作业依赖案例
-
作业依赖案例:JobA和JobB执行完了,才能执行JobC
(1)在windows环境,新建azkaban.project文件,编辑内容如下
azkaban-flow-version: 2.0
(2)新建basic.flow文件,其中dependsOn表示工作单元jobC依赖于工作单元jobA和jobB的完成
nodes: - name: jobC type: command # jobC 依赖 JobA和JobB dependsOn: - jobA - jobB config: command: echo "I’m JobC" - name: jobA type: command config: command: echo "I’m JobA" - name: jobB type: command config: command: echo "I’m JobB"
(3)将azkaban.project和basic.flow进行压缩
(4)在webserver页面创建项目,给项目命名和添加项目描述
(5)上传压缩文件并执行
3.3 自动失败重试案例
-
需求:如果执行任务失败,需要重试3次,重试的时间间隔10000ms
(1)在windows环境,新建azkaban.project文件,编辑内容如下
azkaban-flow-version: 2.0
(2)新建basic.flow文件,其中retries是重试次数、retry.baackoff是重试的时间间隔
nodes: - name: JobA type: command config: command: sh /not_exists.sh retries: 3 retry.backoff: 10000
(3)将azkaban.project和basic.flow进行压缩
(4)在webserver页面创建项目,给项目命名和添加项目描述
(5)上传压缩文件并执行 -
自动失败重试适用于由于其他因素而不是任务本身因素导致的失败,比如网络波动;而如果是由于任务本身因素导致的失败,自动失败重试往往是没有用的, 这种时候只能自己定位到问题产生处,再进行手动失败重试,避免执行已经成功执行的任务单元。
3.4 手动失败重试案例
- 生产环境下,长工作流中的每一个工作单元都有可能失败,当解决问题之后,需要继续执行工作流,执行方法有两种;第一种是在Web Server页面的HIstory中找到执行失败的工作流,点击Execution Id—Preppare Execution会跳过工作流中执行成功的单元;第二种的是找到执行失败的Project,点击Execute Flow,在展示的工作流界面中,右键Disable想要跳过执行的单元
3.5 JavaProcess作业类型案例
-
azkaban的工作单元类型大部分都是command,也可以执行自定义的Java类代码;JavaProcess类型可以运行一个自定义主类方法,type类型为javaprocess,可用的配置为:
Xms:最小堆
Xmx:最大堆
classpath:类路径
java.class:要运行的Java对象,其中必须包含Main方法
main.args:main方法的参数
-
步骤
(1)把编写好的maven项目打包成jar饱(2)在windows环境,新建azkaban.project文件,编辑内容如下
azkaban-flow-version: 2.0
(3)新建basic.flow文件
nodes: - name: test_java type: javaprocess config: Xms: 96M Xmx: 200M java.class: com.atguigu.AzTest
(4)在Web Server创建项目,把jar包、azkaban.properties和basic.flow打包成zip文件后,上传执行
3.6 条件工作流案例
-
条件工作流功能允许用户自定义执行条件来决定是否运行某些Job。条件可以由当前Job的父Job输出的运行时参数构成,也可以使用预定义宏(azkaban预定义好的一些规则)。在这些条件下,用户可以在确定Job执行逻辑时获得更大的灵活性,例如,只要父Job之一成功,就可以运行当前Job。
-
运行时参数案例的基本原理是
(1)父Job将参数写入JOB_OUTPUT_PROP_FILE环境变量所指向的文件
(2)子Job使用 ${jobName:param}来获取父Job输出的参数并定义执行条件
支持的条件运算符:(1)== 等于
(2)!= 不等于
(3)> 大于
(4)>= 大于等于
(5)< 小于
(6)<= 小于等于
(7)&& 与
(8)|| 或
(9)! 非
-
运行时参数案例:JobA执行一个shell脚本。JobB执行一个shell脚本,但JobB不需要每天都执行,而只需要每个周一执行。
(1)新建JobA.sh#!/bin/bash echo "do JobA" wk=`date +%w` echo "{\"wk\":$wk}" > $JOB_OUTPUT_PROP_FILE
(2)新建JobB.sh
#!/bin/bash echo "do JobB"
(3)新建condition.flow
nodes: - name: JobA type: command config: command: sh JobA.sh - name: JobB type: command dependsOn: - JobA config: command: sh JobB.sh condition: ${JobA:wk} == 1
(4)将JobA.sh、JobB.sh、condition.flow和azkaban.project打包成condition.zip
(5)创建condition项目=》上传condition.zip文件=》执行作业=》观察结果 -
Azkaban中预置了几个特殊的判断条件,称为预定义宏。
预定义宏会根据所有父Job的完成情况进行判断,再决定是否执行。可用的预定义宏如下:
(1)all_success: 表示父Job全部成功才执行(默认)
(2)all_done:表示父Job全部完成才执行
(3)all_failed:表示父Job全部失败才执行
(4)one_success:表示父Job至少一个成功才执行
(5)one_failed:表示父Job至少一个失败才执行
-
预定义宏案例:JobA执行一个shell脚本,JobB执行一个shell脚本,JobC执行一个shell脚本,要求JobA、JobB中有一个成功即可执行
(1)新建JobA.sh#!/bin/bash echo "do JobA"
(2)新建JobC.sh
#!/bin/bash echo "do JobC"
(3)新建macro.flow
nodes: - name: JobA type: command config: command: sh JobA.sh - name: JobB type: command config: command: sh JobB.sh - name: JobC type: command dependsOn: - JobA - JobB config: command: sh JobC.sh condition: one_success
(4)JobA.sh、JobC.sh、macro.flow、azkaban.project文件,打包成macro.zip。注意:没有JobB.sh。
(5)创建macro项目=》上传macro.zip文件=》执行作业=》观察结果
3.7 定时执行案例
-
需求:JobA每间隔1分钟执行一次
(1)Azkaban可以定时执行工作流。在执行工作流时候,选择左下角Schedule
(2)右上角注意时区是上海,然后在左面填写具体执行时间,底部会显示接下来执行工作流的最近10个时间
(3)在导航栏里的Scheduling能够查看目前正在调度的任务
3.8 邮件报警案例
-
为了能够及时发现数仓中任务执行产生的问题,Azkaban默认支持进行邮件报警
-
步骤
(1)开启任意邮箱账号的SMTP服务
(2)记住授权码
(3)在azkaban-web节点hadoop102上,编辑/opt/module/azkaban/azkaban-web/conf/azkaban.properties,修改如下内容:保存并重启web-server。#这里设置邮件发送服务器,需要 申请邮箱,且开通stmp服务,以下只是例子 mail.sender=atguigu@126.com mail.host=smtp.126.com mail.user=atguigu@126.com mail.password=用邮箱的授权码
(4)编辑basic.flow
nodes: - name: jobA type: command config: command: echo "This is an email test."
(5)将azkaban.project和basic.flow压缩成email.zip
(6)在Web Server页面创建项目并上传压缩文件,执行工作流时点击Notification可以设置执行失败和成功时通知邮箱
3.9 电话报警案例
- 有时任务执行失败后邮件报警接收不及时,因此可能需要其他报警方式,比如电话报警。如有类似需求,可与第三方告警平台进行集成,例如睿象云。
4 Azkaban多Executor模式注意事项
-
Azkaban多Executor模式是指,在集群中多个节点部署Executor。在这种模式下, Azkaban web Server会根据策略,选取其中一个Executor去执行任务。但这种策略可能会导致有时候任务执行失败,因为可能执行任务所需要的脚本和应用在选中的Executor上是没有的。
为确保所选的Executor能够准确的执行任务,我们须在以下两种方案任选其一,推荐使用方案二
-
方案一:指定特定的Executor(hadoop102)去执行任务。
1)在MySQL中azkaban数据库executors表中,查询hadoop102上的Executor的id。
2)在执行工作流程时,选择Flow Parameters,增加一行,name为useExecutor,value为指定执行的Executor的id方案二:在Executor所在所有节点部署任务所需脚本和应用。