1、Azkaban概论
1.1、Azkaban简介
Azkaban 是一个开源的基于 Web 的工作流调度系统,由 LinkedIn 公司开发并维护。它可以帮助用户在大规模数据处理中来管理和调度作业,提供了简单易用、高效可靠的工作流设计和调度功能。
Azkaban 的主要特点包括:
-
丰富的用户界面:提供了基于 Web 的交互式用户界面,使得用户可以方便地创建、编辑和执行工作流,同时也提供了丰富的监控和报警功能。
-
灵活的工作流定义:使用 Azkaban 可以根据需求灵活地定制工作流,支持各种类型的任务(如 Hadoop MapReduce、Spark、Hive、Pig、Shell、Java 等)以及任务之间的依赖、并行和串行执行等功能。
-
高效的任务调度:Azkaban 提供了高效的任务调度功能,可以根据任务优先级、资源限制等情况智能地分配任务资源,实现高效的任务调度和运行。
-
安全可靠:Azkaban 支持多用户和多组权限控制,并提供了各种安全措施(如 SSL/TLS 加密、Kerberos 认证等)来确保数据安全和隐私保护。
1.2、为什么需要工作流调度系统
例如,我们可能有这样一个需求,某个业务系统每天产生20G原始数据,我们每天都要对其进行处理,处理步骤如下所示:
- 通过Hadoop先将原始数据上传到HDFS上(HDFS的操作);
- 使用MapReduce对原始数据进行清洗(MapReduce的操作);
- 将清洗后的数据导入到hive表中(hive的导入操作);
- 对Hive中多个表的数据进行JOIN处理,得到一张hive的明细表(创建中间表);
- 通过对明细表的统计和分析,得到结果报表信息(hive的查询操作);
要完成这个业务系统需要考虑的角度:
- 一个完整的数据分析系统通常都是由大量任务单元组成:shell脚本程序,java程序,mapreduce程序、hive脚本等。
- 各任务单元之间存在时间先后及前后依赖关系。
- 为了很好地组织起这样的复杂执行计划,需要一个工作流调度系统来调度执行。
像上述需求,特别是在一些涉及多个复杂任务、数据依赖关系复杂、任务之间存在关联和交互等方面,手工调度和管理变得困难且低效。因此,为了保证任务的正常执行,以及简化任务调度管理就显得尤为重要,这也就是需要工作流调度系统的原因。
1.3、常见工作流调度系统
- 简单的任务调度:直接使用Linux的Crontab来定义;
- 复杂的任务调度:开发调度平台或使用现成的开源调度系统,比如Ooize、Azkaban、 Airflow、DolphinScheduler等。
2、Azkaban入门
2.1、Azkaban的部署方式
Azkaban 可以通过以下两种方式进行部署:
-
单机部署:将 Azkaban 作为一个 Java 应用程序,通过在一台服务器上运行一个或多个 Azkaban Web 服务器和 Azkaban Executor 服务器来实现。这种部署方式适合用于学习,或者小型数据处理场景,可以快速部署,但面对大规模数据处理时可能会存在性能瓶颈。
-
分布式部署:将 Azkaban 部署到多个服务器上,可以通过将 Azkaban Web 服务器和 Azkaban Executor 服务器分别分配到不同的服务器上来实现高可用、高性能的工作流调度系统。这种部署方式适合大型数据处理场景,具有更好的扩展性和容错能力。
总之,选择何种部署方式主要取决于实际场景和需求。对于小规模或试验性应用,单机部署可能是更加简便的选择;而对于大规模的生产环境,则需要采用分布式部署以满足高可用、高性能、高扩展性等要求。本文主要使用集群部署。
2.2、Azkaban的集群部署
2.2.1、软件下载
下载地址
2.2.2、软件上传
2.2.3、解压软件,修改名字
[song@hadoop102 software]$ mkdir /opt/model/azkaban
[song@hadoop102 software]$ tar -zxvf azkaban-db-3.84.4.tar.gz -C /opt/model/azkaban/
[song@hadoop102 software]$ tar -zxvf azkaban-exec-server-3.84.4.tar.gz -C /opt/model/azkaban/
[song@hadoop102 software]$ tar -zxvf azkaban-web-server-3.84.4.tar.gz -C /opt/model/azkaban/
[song@hadoop102 azkaban]$ mv azkaban-exec-server-3.84.4/ azkaban-exec
[song@hadoop102 azkaban]$ mv azkaban-web-server-3.84.4/ azkaban-web
2.2.4、配置MySQL
2.2.4.1、正常安装即可,电脑上已经安装了,这就不多介绍了
2.2.4.2、登陆MySQL,创建Azkaban数据库
#创建Azkaban用户,任何主机都可以访问Azkaban,密码是azkaban
CREATE USER 'azkaban'@'%' IDENTIFIED BY 'azkaban';
2.2.4.3、创建azkaban用户并赋予权限
#赋予azkaban用户增删改查权限
GRANT SELECT,INSERT,UPDATE,DELETE ON azkaban.* to 'azkaban'@'%' WITH GRANT OPTION;
2.2.4.4、更改MySQL包大小;防止Azkaban连接MySQL阻塞
在[mysqld]下面加一行max_allowed_packet=1024M
[mysqld]
max_allowed_packet=1024M
2.2.4.5、重启MySQL
sudo systemctl restart mysqld
2.2.4.6、加载SQL
mysql> source /opt/model/azkaban/azkaban-db-3.84.4/create-all-sql-3.84.4.sql
2.2.5、配置Executor Server
Executor Server | Web |
---|---|
hadoop102,hadoop103,hadoop104 | hadoop102 |
- 编辑azkaban.properties
[song@hadoop102 conf]$ vim azkaban.properties
修改内容
#...
default.timezone.id=Asia/Shanghai
#...
azkaban.webserver.url=http://hadoop102:8081
executor.port=12321
#...
database.type=mysql
mysql.port=3306
mysql.host=hadoop102
mysql.database=azkaban
mysql.user=azkaban
mysql.password=azkaban
...
- 同步azkaban-exec到所有节点
- 必须进入到
/opt/model/azkaban/azkaban-exec
路径,分别在三台机器上,启动executor server
bin/start-exec.sh
注意:如果在/opt/model/azkaban/azkaban-exec
目录下出现executor.port文件,说明启动成功
- 激活executor
[song@hadoop102 azkaban-exec]$ curl -G "hadoop102:12321/executor?action=activate" && echo
[song@hadoop103 azkaban-exec]$ curl -G "hadoop103:12321/executor?action=activate" && echo
[song@hadoop104 azkaban-exec]$ curl -G "hadoop104:12321/executor?action=activate" && echo
如果三台机器都出现{"status":"success"}
提示,则表示激活成功
2.2.6、配置Web Server
Azkaban Web Server处理项目管理,身份验证,计划和执行触发。
- 编辑azkaban-web下的azkaban.properties
vim azkaban.properties
修改内容为
...
default.timezone.id=Asia/Shanghai
...
database.type=mysql
mysql.port=3306
mysql.host=hadoop102
mysql.database=azkaban
mysql.user=azkaban
mysql.password=azkaban
mysql.numconnections=100
...
azkaban.executorselector.filters=StaticRemainingFlowSize,CpuStatus
- StaticRemainingFlowSize:正在排队的任务数;
- CpuStatus:CPU占用情况
- MinimumFreeMemory 过滤器会检查 executor 主机空余内存是否会大于 6G,如果不足 6G,则 web-server 不会将任务交由该主机执行
azkaban.executorselector.filters=StaticRemainingFlowSize,CpuStatus
- 修改azkaban-users.xml文件,添加song用户
- 必须进入到hadoop102的/opt/model/azkaban/azkaban-web路径,启动web server
[song@hadoop102 azkaban-web]$ bin/start-web.sh
- 访问http://hadoop102:8081,并用song用户登陆
3、案例实操
3.1、HelloWorld案例
- 在windows环境,新建azkaban.project文件,编辑内容如下
azkaban-flow-version: 2.0
注意:该文件作用,是采用新的Flow-API方式解析flow文件。
- 新建basic.flow文件,内容如下
nodes:
- name: jobA
type: command
config:
command: echo "Hello World"
- Name:job名称
- Type:job类型。command表示你要执行作业的方式为命令
- Config:job配置
-
将azkaban.project、basic.flow文件压缩到一个zip文件,文件名称必须是英文。
-
在WebServer新建项目:hadoop102:8081/index
-
给项目名称命名和添加项目描述
-
文件上传
-
执行任务流
- 在日志中,查看运行结果
3.2、任务作业依赖案例
需求:JobA和JobB执行完了,才能执行JobC
- 修改basic.flow为如下内容
nodes:
- name: jobC
type: command
# jobC 依赖 JobA和JobB
dependsOn:
- jobA
- jobB
config:
command: echo "I’m JobC"
- name: jobA
type: command
config:
command: echo "I’m JobA"
- name: jobB
type: command
config:
command: echo "I’m JobB"
- 将修改后的basic.flow和azkaban.project压缩成second.zip文件
- 重复HelloWorld中的后续步骤
3.3、自动失败重试案例
需求:如果执行任务失败,需要重试3次,重试的时间间隔10000ms
- 编译配置流
nodes:
- name: JobA
type: command
config:
command: sh /not_exists.sh
retries: 3
retry.backoff: 10000
参数说明:
- retries:重试次数
- retry.backoff:重试的时间间隔
- 将修改后的basic.flow和azkaban.project压缩成four.zip文件
- 重复HelloWorld后续步骤,步骤略。
- 执行并观察到一次失败+三次重试
也可以在Flow全局配置中添加任务失败重试配置,此时重试配置会应用到所有Job。
案例如下:
config:
retries: 3
retry.backoff: 10000
nodes:
- name: JobA
type: command
config:
command: sh /not_exists.sh
3.4、手动失败重试案例
需求:JobA=》JobB(依赖于A)=》JobC=》JobD=》JobE=》JobF。生产环境,任何Job都有可能挂掉,可以根据需求执行想要执行的Job。
具体步骤:
- 编译配置流
nodes:
- name: JobA
type: command
config:
command: echo "This is JobA."
- name: JobB
type: command
dependsOn:
- JobA
config:
command: echo "This is JobB."
- name: JobC
type: command
dependsOn:
- JobB
config:
command: echo "This is JobC."
- name: JobD
type: command
dependsOn:
- JobC
config:
command: echo "This is JobD."
- name: JobE
type: command
dependsOn:
- JobD
config:
command: echo "This is JobE."
- name: JobF
type: command
dependsOn:
- JobE
config:
command: echo "This is JobF."
- 将修改后的basic.flow和azkaban.project压缩成five.zip文件
- 重复HelloWorld后续步骤。
Enable和Disable下面都分别有如下参数:
- Parents:该作业的上一个任务
- Ancestors:该作业前的所有任务
- Children:该作业后的一个任务
- Descendents:该作业后的所有任务
- Enable All:所有的任务
- 可以根据需求选择性执行对应的任务。
3.5、JavaProcess作业类型案例
JavaProcess类型可以运行一个自定义主类方法,type类型为javaprocess,可用的配置为:
- Xms:最小堆
- Xmx:最大堆
- classpath:类路径
- java.class:要运行的Java对象,其中必须包含Main方法
- main.args:main方法的参数
案例:
- 新建一个azkaban的maven工程
- 创建包名:com.song
- 创建AzTest类
package com.song;
public class AzTest {
public static void main(String[] args) {
System.out.println("This is for testing!");
}
}
- 打包成jar包test_demo-1.0-SNAPSHOT.jar
- 新建testJava.flow,内容如下
nodes:
- name: test_java
type: javaprocess
config:
Xms: 96M
Xmx: 200M
java.class: com.song.AzTest
- 将Jar包、flow文件和project文件打包成javatest.zip
- 创建项目=》上传javatest.zip =》执行作业=》观察结果
3.6、条件工作流案例
条件工作流功能允许用户自定义执行条件来决定是否运行某些Job。条件可以由当前Job的父Job输出的运行时参数构成,也可以使用预定义宏。在这些条件下,用户可以在确定Job执行逻辑时获得更大的灵活性,例如,只要父Job之一成功,就可以运行当前Job。
- 基本原理
- 父Job将参数写入JOB_OUTPUT_PROP_FILE环境变量所指向的文件
- 子Job使用 ${jobName:param}来获取父Job输出的参数并定义执行条件
- 支持的条件运算符:
(1)== 等于
(2)!= 不等于
(3)> 大于
(4)>= 大于等于
(5)< 小于
(6)<= 小于等于
(7)&& 与
(8)|| 或
(9)! 非
3.6.1、运行时参数案例
-
需求:JobA执行一个shell脚本,JobB执行一个shell脚本,但JobB不需要每天都执行,而只需要每个周一执行。
-
新建JobA.sh
#!/bin/bash
echo "do JobA"
wk=`date +%w`
echo "{\"wk\":$wk}" > $JOB_OUTPUT_PROP_FILE
- 新建JobB.sh
#!/bin/bash
echo "do JobB"
4 新建condition.flow
nodes:
- name: JobA
type: command
config:
command: sh JobA.sh
- name: JobB
type: command
dependsOn:
- JobA
config:
command: sh JobB.sh
condition: ${JobA:wk} == 1
- 将JobA.sh、JobB.sh、condition.flow和azkaban.project打包成condition.zip
- 创建condition项目=》上传condition.zip文件=》执行作业=》观察结果
- 按照我们设定的条件,JobB会根据当日日期决定是否执行。
3.6.2、预定义宏案例
Azkaban中预置了几个特殊的判断条件,称为预定义宏。预定义宏会根据所有父Job的完成情况进行判断,再决定是否执行。可用的预定义宏如下:
- all_success: 表示父Job全部成功才执行(默认)
- all_done:表示父Job全部完成才执行
- all_failed:表示父Job全部失败才执行
- one_success:表示父Job至少一个成功才执行
- one_failed:表示父Job至少一个失败才执行
- 需求:
JobA执行一个shell脚本
JobB执行一个shell脚本
JobC执行一个shell脚本,要求JobA、JobB中有一个成功即可执行 - 新建JobA.sh
#!/bin/bash
echo "do JobA"
- 新建JobC.sh
#!/bin/bash
echo "do JobC"
- 新建macro.flow
nodes:
- name: JobA
type: command
config:
command: sh JobA.sh
- name: JobB
type: command
config:
command: sh JobB.sh
- name: JobC
type: command
dependsOn:
- JobA
- JobB
config:
command: sh JobC.sh
condition: one_success
- JobA.sh、JobC.sh、macro.flow、azkaban.project文件,打包成macro.zip。
注意:没有JobB.sh。 - 创建macro项目=》上传macro.zip文件=》执行作业=》观察结果
3.7、定时执行案例
需求:JobA每间隔1分钟执行一次;
具体步骤:
- Azkaban可以定时执行工作流。在执行工作流时候,选择左下角Schedule
- 右上角注意时区是上海,然后在左面填写具体执行事件,填写的方法和crontab配置定时任务规则一致。
3.8、邮件报警案例
3.8.1、注册邮箱
-
申请注册一个126邮箱
-
点击邮箱账号=》账号管理
-
开启SMTP服务
-
一定要记住授权码
3.8.2、默认邮件报警案例
Azkaban默认支持通过邮件对失败的任务进行报警,配置方法如下:
1)在azkaban-web节点hadoop102上,编辑/opt/model/azkaban/azkaban-web/conf/azkaban.properties
,修改如下内容:
[song@hadoop102 azkaban-web]$ vim /opt/model/azkaban/azkaban-web/conf/azkaban.properties
添加如下内容:
#这里设置邮件发送服务器,需要 申请邮箱,切开通stmp服务,以下只是例子
mail.sender=xxx@126.com
mail.host=smtp.126.com
mail.user=xxx@126.com
mail.password=用邮箱的授权码
- 保存并重启web-server。
[song@hadoop102 azkaban-web]$ bin/shutdown-web.sh
[song@hadoop102 azkaban-web]$ bin/start-web.sh
- 编辑basic.flow
nodes:
- name: jobA
type: command
config:
command: echo "This is an email test."
- 将azkaban.project和basic.flow压缩成email.zip
- 创建工程=》上传文件=》执行作业=》查看结果
- 观察邮箱,发现执行成功或者失败的邮件
3.9、电话报警案例
3.9.1、第三方告警平台集成
有时任务执行失败后邮件报警接收不及时,因此可能需要其他报警方式,比如电话报警。如有类似需求,可与第三方告警平台进行集成,例如睿象云。
- 进入睿象云官网注册账号并登录,官网地址:https://www.aiops.com/
- 集成告警平台,使用Email集成
- 获取邮箱地址,后边需将报警信息发送至该邮箱
- 配置分派策略
- 配置通知策略
- 测试,执行上一个邮件通知的案例,将通知对象改为刚刚集成第三方平台时获取的邮箱。
3.10、Azkaban多Executor模式注意事项
Azkaban多Executor模式是指,在集群中多个节点部署Executor。在这种模式下, Azkaban web Server会根据策略,选取其中一个Executor去执行任务。
为确保所选的Executor能够准确的执行任务,我们须在以下两种方案任选其一,推荐使用方案二。
3.10.1、方案一:指定特定的Executor(hadoop102)去执行任务。
- 在MySQL中azkaban数据库executors表中,查询hadoop102上的Executor的id。
mysql> use azkaban;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> select * from executors;
+----+-----------+-------+--------+
| id | host | port | active |
+----+-----------+-------+--------+
| 1 | hadoop103 | 35985 | 1 |
| 2 | hadoop104 | 36363 | 1 |
| 3 | hadoop102 | 12321 | 1 |
+----+-----------+-------+--------+
3 rows in set (0.00 sec)
- 在执行工作流程时加入useExecutor属性,如下