Apache Airflow (十四) :Airflow分布式集群搭建及测试

news2025/1/11 21:49:07

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客

 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。

 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频


目录

1. 节点规划

2. airflow集群搭建步骤

3. 初始化Airflow

4. 创建管理员用户信息

​​​​​​​5. 配置Scheduler HA

​​​​​​​6. 启动Airflow集群

​​​​​​​7. 访问Airflow 集群WebUI

8. 测试Airflow HA


1. 节点规划

节点IP

节点名称

节点角色

运行服务

192.168.179.4

node1

Master1

webserver,scheduler

192.168.179.5

node2

Master2

websever,scheduler

192.168.179.6

node3

Worker1

worker

192.168.179.7

node4

Worker2

worker

2. airflow集群搭建步骤

1) 在所有节点安装python3.7

参照单节点安装Airflow中安装anconda及python3.7。

2) 在所有节点上安装airflow

  • 每台节点安装airflow需要的系统依赖
yum -y install mysql-devel gcc gcc-devel python-devel gcc-c++ cyrus-sasl cyrus-sasl-devel cyrus-sasl-lib 
  • 每台节点配置airflow环境变量
vim /etc/profile

export AIRFLOW_HOME=/root/airflow



#使配置的环境变量生效

source /etc/profile
  • 每台节点切换airflow环境,安装airflow,指定版本为2.1.3
(python37)   conda activate python37

(python37) pip install apache-airflow==2.1.3 -i https://pypi.tuna.tsinghua.edu.cn/simple

默认Airflow安装在$ANCONDA_HOME/envs/python37/lib/python3.7/site-packages/airflow目录下。配置了AIRFLOW_HOME,Airflow安装后文件存储目录在AIRFLOW_HOME目录下。可以每台节点查看安装Airflow版本信息:

(python37)  airflow version

2.1.3
  • 在Mysql中创建对应的库并设置参数

aiflow使用的Metadata database我们这里使用mysql,在node2节点的mysql中创建airflow使用的库及表信息。

CREATE DATABASE airflow CHARACTER SET utf8;

create user 'airflow'@'%' identified by '123456';

grant all privileges on airflow.* to 'airflow'@'%';

flush privileges;

在mysql安装节点node2上修改”/etc/my.cnf”,在[mysqld]下添加如下内容:

[mysqld]

explicit_defaults_for_timestamp=1

以上修改完成“my.cnf”值后,重启Mysql即可,重启之后,可以查询对应的参数是否生效:

#重启mysql

[root@node2 ~]# service mysqld restart



#重新登录mysql查询

mysql> show variables like 'explicit_defaults_for_timestamp';

  • 每台节点配置Airflow airflow.cfg文件

修改AIRFLOW_HOME/airflow.cfg文件,确保所有机器使用同一份配置文件,在node1节点上配置airflow.cfg,配置如下:

[core]

dags_folder = /root/airflow/dags



#修改时区

default_timezone = Asia/Shanghai



#配置Executor类型,集群建议配置CeleryExecutor

executor = CeleryExecutor



# 配置数据库

sql_alchemy_conn=mysql+mysqldb://airflow:123456@node2:3306/airflow?use_unicode=true&charset=utf8



[webserver]

#设置时区

default_ui_timezone = Asia/Shanghai



[celery]

#配置Celery broker使用的消息队列

broker_url = redis://node4:6379/0

#配置Celery broker任务完成后状态更新使用库

result_backend = db+mysql://root:123456@node2:3306/airflow

将node1节点配置好的airflow.cfg发送到node2、node3、node4节点上

3. 初始化Airflow

1) 每台节点安装需要的python依赖包

初始化Airflow数据库时需要使用到连接mysql的包,执行如下命令来安装mysql对应的python包。

​
(python37) #  pip install mysqlclient -i Simple Index

​

2) 在node1上初始化Airflow 数据库

(python37) [root@node1 airflow]# airflow db init

初始化之后在MySQL airflow库下会生成对应的表。

4. 创建管理员用户信息

在node1节点上执行如下命令,创建操作Airflow的用户信息:

airflow users create \

    --username airflow \

    --firstname airflow \

    --lastname airflow \

    --role Admin \

    --email xx@qq.com

执行完成之后,设置密码为“123456”并确认,完成Airflow管理员信息创建。

​​​​​​​5. 配置Scheduler HA

1) 下载failover组件

登录https://github.com/teamclairvoyant/airflow-scheduler-failover-controller下载 airflow-scheduler-failover-controller 第三方组件,将下载好的zip包上传到node1 “/software”目录下。

在node1节点安装unzip,并解压failover组件:

(python37) [root@node1 software]# yum -y install unzip

(python37) [root@node1 software]# unzip ./airflow-scheduler-failover-controller-master.zip

2) 使用pip进行安装failover需要的依赖包

需要在node1节点上安装failover需要的依赖包。

(python37) [root@node1 software]# cd /software/airflow-scheduler-failover-controller-master

(python37) [root@node1 airflow-scheduler-failover-controller-master]# pip install -e .

3) node1节点初始化failover

(python37) [root@node1 ~]# scheduler_failover_controller init

Adding Scheduler Failover configs to Airflow config file...

Finished adding Scheduler Failover configs to Airflow config file.

Finished Initializing Configurations to allow Scheduler Failover Controller to run. Please update the airflow.cfg with your desired configurations.

注意:初始化airflow时,会向airflow.cfg配置中追加配置,因此需要先安装 airflow 并初始化。

4) 修改airflow.cfg

首先修改node1节点的AIRFLOW_HOME/airflow.cfg

[scheduler_failover]

# 配置airflow Master节点,这里配置为node1,node2,两节点需要免密

scheduler_nodes_in_cluster = node1,node2



#在1088行,特别注意,需要去掉一个分号,不然后期自动重启Scheduler不能正常启动

airflow_scheduler_start_command = export AIRFLOW_HOME=/root/airflow;nohup airflow scheduler >> ~/airflow/logs/scheduler.logs &

配置完成后,可以通过以下命令进行验证Airflow Master节点:

(python37) [root@node1 airflow]# scheduler_failover_controller test_connection

Testing Connection for host 'node1'

(True, ['Connection Succeeded', ''])

Testing Connection for host 'node2'

(True, ['Connection Succeeded\n'])

将node1节点配置好的airflow.cfg同步发送到node2、node3、node4节点上:

(python37) [root@node1 ~]# cd /root/airflow/

(python37) [root@node1 airflow]# scp airflow.cfg node2:`pwd`

(python37) [root@node1 airflow]# scp airflow.cfg node3:`pwd`

(python37) [root@node1 airflow]# scp airflow.cfg node4:`pwd`

​​​​​​​6. 启动Airflow集群

1) 在所有节点安装启动Airflow依赖的python包

(python37) [root@node1 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3

(python37) [root@node2 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3

(python37) [root@node3 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3

(python37) [root@node4 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3

2) 在Master1节点(node1)启动相应进程

#默认后台启动可以使用-D ,这里使用-D有时不能正常启动Airflow对应进程

airflow webserver

airflow scheduler

3) 在Master2节点(node2)启动相应进程

airflow webserver

4) 在Worker1(node3)、Worker2(node4)节点启动Worker

在node3、node4节点启动Worker:

(python37) [root@node3 ~]# airflow celery worker

(python37) [root@node4 ~]# airflow celery worker

5) 在node1启动Scheduler HA

(python37) [root@node1 airflow]# nohup scheduler_failover_controller start > /root/airflow/logs/scheduler_failover/scheduler_failover_run.log &

​​​​​​​

至此,Airflow高可用集群搭建完成。

​​​​​​​7. 访问Airflow 集群WebUI

浏览器输入node1:8080,查看Airflow WebUI:

8. 测试Airflow HA

1) 准备shell脚本

Airflow集群所有节点{AIRFLOW_HOME}目录下创建dags目录,准备如下两个shell脚本,将以下两个脚本放在$AIRFLOW_HOME/dags目录下,BashOperator默认执行脚本时,默认从/tmp/airflow**临时目录查找对应脚本,由于临时目录名称不定,这里建议执行脚本时,在“bash_command”中写上绝对路径。如果要写相对路径,可以将脚本放在/tmp目录下,在“bash_command”中执行命令写上“sh ../xxx.sh”也可以。

first_shell.sh

#!/bin/bash

dt=$1

echo "==== execute first shell ===="

echo "---- first : time is ${dt}"

second_shell.sh

#!/bin/bash

dt=$1

echo "==== execute second shell ===="

echo "---- second : time is ${dt}"

2) 编写airflow python 配置

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner':'zhangsan',
    'start_date':datetime(2021, 9, 23),
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'execute_shell_sh',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

first=BashOperator(
    task_id='first',
    #脚本路径建议写绝对路径
    bash_command='sh /root/airflow/dags/first_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),
    dag = dag
)

second=BashOperator(
    task_id='second',
    #脚本路径建议写绝对路径
    bash_command='sh /root/airflow/dags/second_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),
    dag=dag
)

first >> second

将以上内容写入execute_shell.py文件,上传到所有Airflow节点{AIRFLOW_HOME}/dags目录下。

3) 重启Airflow,进入Airflow WebUI查看对应的调度

重启Airflow之前首先在node1节点关闭webserver ,Scheduler进程,在node2节点关闭webserver ,Scheduler进程,在node3,node4节点上关闭worker进程。

如果各个进程是后台启动,查看后台进程方式:

(python37) [root@node1 dags]# ps aux |grep webserver

(python37) [root@node1 dags]# ps aux |grep scheduler

(python37) [root@node2 dags]# ps aux |grep webserver

(python37) [root@node2 dags]# ps aux |grep scheduler

(python37) [root@node3 ~]# ps aux|grep "celery worker"

(python37) [root@node4 ~]# ps aux|grep "celery worker"

找到对应的启动命令对应的进程号,进行kill。

重启后进入Airflow WebUI查看任务:

点击“success”任务后,可以看到脚本执行成功日志:

​​​​​​​4) 测试Airflow HA

当我们把node1节点的websever关闭后,可以直接通过node2节点访问airflow webui:

在node1节点上,查找“scheduler”进程并kill,测试scheduler HA 是否生效:

(python37) [root@node1 ~]# ps aux|grep scheduler

root      23744  0.9  3.3 326940 63028 pts/2    S    00:08   0:02 airflow scheduler -- DagFileProcessorManager

#kill 掉scheduler进程

(python37) [root@node1 ~]# kill -9 23744



访问webserver webui

在node1节点查看scheduler_failover_controller进程日志中有启动schudler动作,注意:这里是先从node1启动,启动不起来再从其他Master 节点启动Schduler。


 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1261875.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

jquery 地址四级联级显示 不默认选择

代码效果 <body class"bgca"><img src"./files/joinTooBg.png" style"width: 100%;object-fit: cover;" alt""><!--填写申请资料--><section><div class"zi-liao"><h3 class"zong-h…

Apache Flink(一):Apache Flink是什么?

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频 目录

搜维尔科技:8K!光线追踪!超大视场角!Varjo震撼发布XR-4系列

VR/XR硬件厂商Varjo发布了新一代XR-4系列头显—XR-4、XR-4 Focal Edition和XR-4 Secure Edition&#xff0c;升级后的头显提供了与自然视觉几乎无异的虚拟和混合现实体验。 划重点-新一代XR-4系列头显小编总结如下&#xff1a; 8K分辨率&#xff0c;高达120超大视场角&#xf…

Jmeter-分布式压测(远程启动服务器,windows)

1 前提条件 JDK已部署&#xff0c;版本一致Jmeter已部署&#xff0c;版本一致多台服务器连接的同一网络(例如&#xff1a;同一wifi)防火墙处于关闭状态&#xff08;或者对应默认端口处于开放状态&#xff09;虚拟网络适配器都处于关闭状态查找到每一台服务器的IP 2 主服务器配…

从四个典型场景看如何将数据集成“用到实处”

一、数据集成概念 数据集成是指将来自不同数据源的数据整合到一个统一的数据存储中&#xff0c;并确保这些数据能够互相关联、交换和共享的过程。在数据集成的过程中&#xff0c;数据通常需要经过清洗、转换和统一格式化等步骤&#xff0c;以确保数据的一致性、完整性和可用性…

第71讲:MySQL锁机制详解:表级锁、元数据锁和意向锁的全面解析与实践指南

MySQL中的表级锁 文章目录 MySQL中的表级锁1.MySQL中表级锁的概念2.表锁的概念以及基本使用2.1.表锁的分类以及概念2.2.表锁的使用语法2.3.表共享读锁的基本使用2.4.表独占写锁的基本使用 3.元数据锁的概念以及基本使用3.1.元数据锁的概念3.2.常见的SQL操作所对应的元数据锁3.3…

Java小游戏 王者荣耀

GameFrame类 所需图片&#xff1a; package 王者荣耀;import java.awt.*; import java.awt.event.ActionEvent; import java.awt.event.ActionListener; import java.awt.event.KeyAdapter; import java.awt.event.KeyEvent; import java.io.File; import java.util.ArrayLis…

【C++】什么是模板?怎样使用模板?

&#x1f440;樊梓慕&#xff1a;个人主页 &#x1f3a5;个人专栏&#xff1a;《C语言》《数据结构》《蓝桥杯试题》《LeetCode刷题笔记》《实训项目》《C》 &#x1f31d;每一个不曾起舞的日子&#xff0c;都是对生命的辜负 目录 前言 1.函数模板 1.1函数模板概念 1.2函数…

ecology主表checkBox按钮给明细表浏览按钮赋值

需求&#xff1a; 通过上面图片红框中的单选按钮&#xff0c;将对应的负责人赋值到明细表的负责人位置。 核心代码&#xff1a; <script> jQuery(document).ready(function(){WfForm.bindFieldChangeEvent("field17616", function(obj,id,value){ SelectVa…

JAVA进阶之路JVM-2:类加载机制,类的生命周期,类加载过程,类加载时机,类加载器,双亲委派模型,对象创建过程

JVM类加载机制 类加载 ​ 在JVM虚拟机实现规范中&#xff0c;通过ClassLoader类加载把*.class字节码文件&#xff08;文件流&#xff09;加载到内存&#xff0c;并对字节码文件内容进行验证&#xff0c;准备&#xff0c;解析和初始化&#xff0c;最终形成可以被虚拟机直接使用…

浙江启用无人机巡山护林模式,火灾扑救效率高

为了保护天然的森林资源&#xff0c;浙江当地林业部门引入了一种创新技术&#xff1a;林业无人机。这些天空中的守护者正在重新定义森林防火和护林工作的方式。 当下正值天气干燥的季节&#xff0c;这些无人机开始了它们的首次大规模任务。它们在指定的林区内自主巡逻&#xff…

Win7 SP1 x64 安装 Python 出错解决方法

1 双击安装 python-3.7.9.exe &#xff0c;提示出错&#xff0c;log.file 显示需要 KB2533623&#xff0c;但在Microsoft Update Catalog 没有搜到&#xff0c;实验 KB4474419 也可以。 2 Microsoft Update Catalog 搜索 KB4474419 并下载&#xff0c;安装&#xff0c;重启电脑…

聊聊如何进行代码混淆

​ 前言什么是代码混淆代码混淆&#xff0c;是指将计算机程序的代码&#xff0c;转换成一种功能上等价&#xff0c;但是难于阅读和理解的形式的行为。 代码混淆常见手段1、名称混淆 将有意义的类&#xff0c;字段、方法名称更改为无意义的字符串。生成的新名称越短&#xff0…

Windows系统下搭建PXE Server

在给一台服务器初始安装OS时一般有以下几种方式&#xff1a; 1、通过BMC挂载iso镜像来安装&#xff1b; 2、通过U盘启动来安装&#xff1b; 3、通过网络启动来安装&#xff1b; 方式1和方式2只能一台一台地进行&#xff0c;且需要有键盘和显示器&#xff0c;效率低下&#xff…

HTTP/3 为什么正迅速崛起

超文本传输协议&#xff08;HTTP&#xff09;作为互联网的基石&#xff0c;一直在网页加载、视频流传输、应用获取数据等方方面面发挥重要作用。 去年&#xff0c;负责定义互联网技术的互联网工程任务组&#xff08;IETF&#xff09;将该协议的最新版本 HTTP/3 定为标准。在此…

【深度学习】gan网络原理生成对抗网络

【深度学习】gan网络原理生成对抗网络 GAN的基本思想源自博弈论你的二人零和博弈&#xff0c;由一个生成器和一个判别器构成&#xff0c;通过对抗学习的方式训练&#xff0c;目的是估测数据样本的潜在分布并生成新的数据样本。 1.下载数据并对数据进行规范 transform tran…

Vue基础入门(三):Vue3的使用

Vue3的使用 一、首页案例修改 修改首页的信息&#xff1a;是在之前介绍的HelloWorld.vue文件中进行内容的修改。 页面展示效果&#xff1a; 此时就看到了我们新添加的文字了&#xff01; 同样的我们开发代码的时候只需要修改了项目中的内容然后保存就会自动刷新的浏览器&…

初刷leetcode题目(8)——数据结构与算法

&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️Take your time ! &#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️…

一种新的基于物理的AlGaN/GaN HFET紧凑模型

标题&#xff1a;A new physics-based compact model for AlGaN/GaN HFETs (IEEE MTT-S International Microwave Symposium) 摘要 摘要 - 针对AlGaN/GaN HFET&#xff0c;提出了一种无拟合参数的物理解析模型。对于非饱和操作&#xff0c;建立了两个接入区和栅极下方I-V特性的…

ModuleNotFoundError: No module named ‘mdtex2html‘ module已经安装还是报错,怎么办?

用streamlit运行ChatGLM/basic_model/web_demo.py的时候&#xff0c;出现了module not found&#xff1a; ModuleNotFoundError: No module named mdtex2html Traceback: File "/home/haiyue/.local/lib/python3.10/site-packages/streamlit/runtime/scriptrunner/script…