ByteHouse+Apache Airflow:高效简化数据管理流程

news2024/12/24 20:29:35

Apache Airflow 与 ByteHouse 相结合,为管理和执行数据流程提供了强大而高效的解决方案。本文突出了使用 Apache Airflow 与 ByteHouse 的主要优势和特点,展示如何简化数据工作流程并推动业务成功。

主要优势

  1. 可扩展可靠的数据流程:Apache Airflow 提供了一个强大的平台,用于设计和编排数据流程,让您轻松处理复杂的工作流程。搭配 ByteHouse,一款云原生的数据仓库解决方案,您可以高效地存储和处理大量数据,确保可扩展性和可靠性。

  2. 自动化工作流管理:Airflow 的直观界面通过可视化的 DAG(有向无环图)编辑器,使得创建和调度数据工作流程变得容易。通过与 ByteHouse 集成,您可以自动化提取、转换和加载(ETL)过程,减少手动工作量,实现更高效的数据管理。

  3. 简单的部署和管理:Apache Airflow 和 ByteHouse 均设计为简单的部署和管理。Airflow 可以部署在本地或云端,而 ByteHouse 提供完全托管的云原生数据仓库解决方案。这种组合使得数据基础设施的设置和维护变得无缝化。

客户场景

业务场景

在这个客户场景中,一家名为“数据洞察有限公司(假名)”的分析公司,他们将 Apache Airflow 作为数据管道编排工具。他们选择 ByteHouse 作为数据仓库解决方案,以利用其强大的分析和机器学习功能。

数据洞察有限公司在电子商务行业运营,并收集存储在 AWS S3 中的大量客户和交易数据。他们需要定期将这些数据加载到 ByteHouse,并执行各种分析任务,以获得对业务运营的洞察。

数据链路

使用 Apache Airflow,数据洞察有限公司设置了一个基于特定事件或时间表的数据加载管道。例如,他们可以配置 Airflow 在每天的特定时间触发数据加载过程,或者当新的数据文件添加到指定的 AWS S3 存储桶时触发。当触发事件发生时,Airflow 通过从 AWS S3 中检索相关数据文件来启动数据加载过程。它使用适当的凭据和 API 集成确保与 S3 存储桶的安全身份验证和连接。一旦数据从 AWS S3 中获取,Airflow 会协调数据的转换和加载到 ByteHouse 中。它利用 ByteHouse 的集成能力,根据预定义的模式和数据模型高效地存储和组织数据。

成功将数据加载到 ByteHouse 后,数据洞察有限公司可以利用 ByteHouse 的功能进行分析和机器学习任务。他们可以使用 ByteHouse 的类 SQL 语言查询数据,进行复杂的分析,生成报告,并揭示有关客户、销售趋势和产品性能的有意义洞察。

此外,数据洞察有限公司还利用 ByteHouse 的功能创建交互式仪表板和可视化。他们可以构建动态仪表板,显示实时指标,监控关键绩效指标,并与组织中的利益相关者共享可操作的洞察。

最后,数据洞察有限公司利用 ByteHouse 的机器学习功能来开发预测模型、推荐系统或客户细分算法。ByteHouse 提供了必要的计算能力和存储基础设施,用于训练和部署机器学习模型,使数据洞察有限公司能够获得有价值的预测性和规定性洞察。

总结

通过使用 Apache Airflow 作为数据管道编排工具,并将其与 ByteHouse 集成,数据洞察有限公司实现了从 AWS S3 加载数据到 ByteHouse 的流畅自动化流程。他们充分利用 ByteHouse 的强大分析、机器学习和仪表板功能,获得有价值的洞察,并推动组织内的数据驱动。

ByteHouse<>AirFlow 快速入门

先决条件

在您的虚拟/本地环境中安装 pip。在您的虚拟/本地环境中安装 ByteHouse CLI 并登录到 ByteHouse 账户。参考 ByteHouse CLI 以获取安装帮助。macOS 上使用 Homebrew 的示例brew install bytehouse-cli

安装 Apache Airflow

在本教程中,我们使用 pip 在您的本地或虚拟环境中安装 Apache Airflow。了解更多信息,请参阅官方 Airflow 文档。

# airflow需要一个目录,~/airflow是默认目录,
# 但如果您喜欢,可以选择其他位置
#(可选)
export AIRFLOW_HOME=~/airflow

AIRFLOW_VERSION=2.1.3
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"

# 例如:3.6
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

如果使用 pip 无法安装,请尝试使用 pip3 install 进行安装。安装完成后,运行命令 airflow info 以获取有关 Airflow 的更多信息。

Airflow 初始化

通过执行以下命令来初始化 Airflow 的 Web 服务器

√# 初始化数据库
airflow db init


airflow users create \
--username admin \
--firstname admin \
--lastname admin \
--role Admin \
--email admin

# 启动Web服务器,默认端口是8080
# 或修改airflow.cfg设置web_server_port
airflow webserver --port 8080

设置好 Web 服务器后,您可以访问 http://localhost:8080/使用先前设置的用户名和密码登录 Airflow 控制台。

在新的终端中,使用以下命令设置 Airflow 调度器。然后,刷新 http://localhost:8080/。

YAML 配置

使用 cd ~/airflow 命令进入 Airflow 文件夹。打开名为 airflow.cfg 的配置文件。添加配置并连接到数据库。默认情况下,您可以使用 SQLite,但也可以连接到 MySQL。

# 默认情况下是SQLite,也可以连接到MySQL
sql_alchemy_conn = mysql+pymysql://airflow:airflow@xxx.xx.xx.xx:8080/airflow

# authenticate = False
# 禁用Alchemy连接池以防止设置Airflow调度器时出现故障 https://github.com/apache/airflow/issues/10055
sql_alchemy_pool_enabled = False

# 存放Airflow流水线的文件夹,通常是代码库中的子文件夹。该路径必须是绝对路径。
dags_folder = /home/admin/airflow/dags

创建有向无环图(DAG)作业

在 Airflow 路径下创建一个名为 dags 的文件夹,然后创建 test_bytehouse.py 以启动一个新的 DAG 作业。

~/airflow
mkdir dags
cd dags
nano test_bytehouse.py

在 test_bytehouse.py 中添加以下代码。该作业可以连接到 ByteHouse CLI,并使用 BashOperator 运行任务、查询或将数据加载到 ByteHouse 中。

from datetime import timedelta
from textwrap import dedent

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
with DAG(
    'test_bytehouse',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=days_ago(1),
    tags=['example'],
) as dag:
    
    tImport  = BashOperator(
        task_id='ch_import',
        depends_on_past=False,
        bash_command='$Bytehouse_HOME/bytehouse-cli -cf /root/bytehouse-cli/conf.toml "INSERT INTO korver.cell_towers_1 FORMAT csv INFILE \'/opt/bytehousecli/data.csv\' "',
    )

    tSelect  = BashOperator(
        task_id='ch_select',
        depends_on_past=False,
        bash_command='$Bytehouse_HOME/bytehouse-cli -cf /root/bytehouse-cli/conf.toml -q "select * from korver.cell_towers_1 limit 10 into outfile \'/opt/bytehousecli/dataout.csv\' format csv "'
    )
    
    tSelect >> tImport

在当前文件路径下运行 python test_bytehouse.py 以在 Airflow 中创建 DAG。在浏览器中刷新网页。您可以在 DAG 列表中看到新创建的名为 test_bytehouse 的 DAG。

执行 DAG

在终端中运行以下 Airflow 命令来查看 DAG 列表和 test_bytehouse DAG 中的子任务。您可以分别测试查询执行和数据导入任务。

#打印"test_bytehouse" DAG中的任务列表
[root@VM-64-47-centos dags]# airflow tasks list test_bytehouse
ch_import
ch_select

#打印"test_bytehouse" DAG中任务的层次结构
[root@VM-64-47-centos dags]# airflow tasks list test_bytehouse --tree
<Task(BashOperator): ch_select>
<Task(BashOperator): ch_import>

运行完 DAG 后,查看您的 ByteHouse 账户中的查询历史页面和数据库模块。您应该能够看到查询/加载数据成功执行的结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/686867.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用MASA Stack+.Net 从零开始搭建IoT平台 第五章 使用时序库存储上行数据

目录 前言分析实施步骤时序库的安装解决playload没有时间戳问题代码编写 总结 前言 我们可以将设备上行数据存储到关系型数据库中&#xff0c;我们需要两张带有时间戳的表&#xff08;最新数据表 和 历史数据表&#xff09;&#xff0c;历史数据表存储所有设备上报的数据&…

iptables详解

iptables简介 netfilter/iptables&#xff08;简称为iptables&#xff09;组成Linux平台下的包过滤防火墙&#xff0c;完成封包过滤、封包重定向和网络地址转换&#xff08;NAT&#xff09;等功能。 iptables 规则&#xff08;rules&#xff09;其实就是网络管理员预定义的条…

神通数据库X86架构适配DJANGO317指南

制作神通数据库镜像 1&#xff09;、下载docker.io/centos:7.9.2009镜像&#xff0c;docker pull docker.io/centos:7.9.2009 2)、运行一个容器&#xff0c;docker run -itd --name shentong -p 2003:2003 --privilegedtrue --restartalways -v /sys/fs/cgroup:/sys/fs/cgrou…

万字详解JavaScript手写一个Promise

目录 前言Promise核心原理实现 Promise的使用分析MyPromise的实现在Promise中加入异步操作 实现then方法的多次调用 实现then的链式调用 then方法链式调用识别Promise对象自返回 捕获错误及 then 链式调用其他状态代码补充 捕获执行器错误捕获then中的报错错误与异步状态的链式…

硬盘设备出现“设备硬件出现致命错误,导致请求失败”怎么办?

当我们尝试访问或打开计算机上的硬盘设备&#xff0c;有时候会出现“设备硬件出现致命错误&#xff0c;导致请求失败”的错误提示&#xff0c;这该怎么办呢&#xff1f;下面我们就来了解一下。 出现“设备硬件出现致命错误&#xff0c;导致请求失败”错误的原因有哪些&#xff…

机器学习之SVM支持向量机

目录 经典SVM 软间隔SVM 核SVM SVM分类器应用于人脸识别 SVM优点 SVM缺点 经典SVM 支持向量机&#xff08;Support Vector Machine&#xff0c;SVM&#xff09;是一种二分类模型&#xff0c;其基本思想是在特征空间中找到一个最优的超平面&#xff0c;使得正负样本点到…

数据结构 队列(C语言实现)

绪论 任其事必图其效&#xff1b;欲责其效&#xff0c;必尽其方。——欧阳修&#xff1b;本篇文章主要写的是什么是队列、以及队列是由什么组成的和这些组成接口的代码实现过程。&#xff08;大多细节的实现过程以注释的方式展示请注意查看&#xff09; 话不多说安全带系好&…

Python3,关于请求重试,这次requests库给安排的明明白白。

requests库重试请求 1、引言2、requests库2.1 安装2.2 代码实例2.2.1 重试次数设置2.2.2 重试条件设置2.2.3 超时时间设置 3、总结 1、引言 小屌丝&#xff1a;鱼哥&#xff0c; 你看这是啥&#xff1f; 小鱼&#xff1a;我瞅瞅… 小屌丝&#xff1a;鱼哥&#xff0c;你这眼神…

【计算机视觉】Fast Segment Anything 安装步骤和示例代码解读(含源代码)

文章目录 一、导读二、安装步骤2.1 将存储库克隆到本地2.2 创建 conda 环境2.3 安装软件包2.4 安装 CLIP2.5 下载权重文件2.6 开始使用2.6.1 Everything mode2.6.2 Text prompt2.6.3 Box prompt (xywh)2.6.4 Points prompt 三、示例代码 一、导读 论文地址&#xff1a; https:…

服务器配置与操作

服务器配置与操作 一、连接远程服务器 推荐用xshell 或者 finalshell 或者 winSCP 或者 FileZilla xshell下载地址&#xff1a;https://xshell.en.softonic.com/ 二、服务器配置 2.1 安装JDK 2.1 方法一&#xff1a;在线安装 yum list java* yum -y install java-1.8.0-ope…

【Django | 爬虫 】收集某吧评论集成舆情监控(附源码)

&#x1f935;‍♂️ 个人主页: 计算机魔术师 &#x1f468;‍&#x1f4bb; 作者简介&#xff1a;CSDN内容合伙人&#xff0c;全栈领域优质创作者。 文章目录 一、爬取帖子、二级评论二、构建数据表三、并入项目1. spider代码2. view视图代码3. 优化后台界面3. urls路由 四、定…

第二十三章Java二维数组详解

一、创建二维数组 在 Java 中二维数组被看作数组的数组&#xff0c;即二维数组为一个特殊的一维数组&#xff0c;其每个元素又是一个一维数组。Java 并不直接支持二维数组&#xff0c;但是允许定义数组元素是一维数组的一维数组&#xff0c;以达到同样的效果。声明二维数组的语…

编程规范-控制流程、错误和异常处理

前言&#xff1a; \textcolor{Green}{前言&#xff1a;} 前言&#xff1a; &#x1f49e;这个专栏就专门来记录一下寒假参加的第五期字节跳动训练营 &#x1f49e;从这个专栏里面可以迅速获得Go的知识 今天的笔记是对编程规范的补充&#xff0c;对控制流程、错误和异常处理进行…

Ansys Zemax | 内窥镜物镜系统初始结构的优化提升(下)

系统性能提升 根据上篇的内窥镜系统分析&#xff0c;我们可以从四个方面对内窥镜物镜系统进行优化&#xff1a;元件间距、圆锥系数、MTF 值以及畸变值。点击优化-评价函数编辑器以设置具体的评价函数。&#xff08;联系我们获取文章附件&#xff09; 首先&#xff0c;用三个 CO…

NXP i.MX 8M Plus工业开发板硬件说明书--下册( 四核ARM Cortex-A53 + 单核ARM Cortex-M7,主频1.6GHz)

前 言 本文档主要介绍创龙科技TLIMX8MP-EVM评估板硬件接口资源以及设计注意事项等内容。 创龙科技TLIMX8MP-EVM是一款基于NXP i.MX 8M Plus的四核ARM Cortex-A53 单核ARM Cortex-M7异构多核处理器设计的高性能工业评估板&#xff0c;由核心板和评估底板组成。ARM Cortex-A5…

【AndroidUI设计】Bottom Navigation Activity中Fragment(碎片)的添加和下层导航图标的修改

文章目录 一、引言二、设计1、添加Fragment&#xff08;1&#xff09;确认需求&#xff08;2&#xff09;创建 <1> 方法一&#xff1a;借助工具快速生成 <2> 方法二&#xff1a;视图&#xff08;图层&#xff09;工具 <3> 方法三&#xff1a;手动…

知网G4《语数外学习》简介及投稿邮箱

知网G4教育专刊《语数外学习》简介及投稿邮箱 《语数外学习》全新改版&#xff0c;分别针对初中三个不同年级&#xff0c;每本仍然兼顾语数外三个学科。改版后的《语数外学习》将密切关注课改和中考改革的进程&#xff0c;与教材同步&#xff0c;在帮中学生朋友释疑疑惑、提高…

DOTA-PEG3-azide,1428146-79-5,DOTA三聚乙二醇叠氮,试剂相关研究说明

DOTA-PEG3-azide&#xff0c;DOTA PEG3 N3&#xff0c;DOTA三聚乙二醇叠氮产品结构式&#xff1a; 产品规格&#xff1a; 1.CAS号&#xff1a;1428146-79-5 2.分子式&#xff1a;C24H44N8O10 3.分子量&#xff1a;604.66 4.包装规格&#xff1a;白色固体 &#xff0c;1g、5g、1…

数据库性能测试

目录 前言&#xff1a; 1.引入数据库驱动包 2.添加数据库配置元件 3、JDBCRequest参数化 4、Variablesnames参数使用方法&#xff1a; 前言&#xff1a; 数据库性能测试是测试数据库系统在各种条件下的性能和稳定性的过程。它可以帮助测试人员识别数据库系统的性能瓶颈&a…

30余名「实在RPA·数字员工」在纳爱斯诞生,在618中服务千万消费者!

积水成渊&#xff0c;聚沙成塔&#xff01;谁在世界数字化大势中不断变革自己&#xff1f; 长期蝉联“中国品牌价值评价”日化行业首位&#xff0c;问鼎中国工业“奥斯卡”大奖的“大国品牌”纳爱斯——当仁不让&#xff01; 纳爱斯是日化行业领军企业&#xff0c;业务覆盖家…