【Airflow】构建爬虫任务系统

news2024/11/18 15:47:01

爬虫脚本太多了需要进行管理一下,领导决定使用airflow
我了解了一下这个平台是用来做任务调度。
是一个ETL工具
ETL是将业务系统的数据经过抽取、清洗转换之后加载到数据仓库的过程
在这里插入图片描述
这里是一个github的地址

https://github.com/apache/airflow

这里是官方文档

https://airflow.apache.org/docs/apache-airflow/stable/index.html

这里是我学习的视频资料

https://www.bilibili.com/video/BV19f4y1V7UG/
博主的视频有一些老了,不过也是可以学习的。

先看一下成果吧
在这里插入图片描述

这里是首页记录了,我得dags任务
dags就相当于我得一个爬虫任务,
在这里插入图片描述
我们进入这个dag里面,可以看到
有4个小任务(task)和一task执行的状态,还有一些基本信息。
在这里插入图片描述
可以看每一个task的日志,如果发生问题也可以更方便的定位问题的。
每个dag都有很多配置,比如定时任务,失败重试,自动拉起,报警邮件等功能。
以后再也不用上服务器去看日志了
嘻嘻嘻嘻嘻

conda环境安装

这里需要一个干净的环境,我使用的miniconda
来到 conda网站

https://docs.conda.io/projects/miniconda/en/latest/miniconda-other-installer-links.html

在这里插入图片描述
这里选择linux 的 我是用的是python3.8
然后复制这个链接地址。
在这里插入图片描述
然后我们到服务器上将这个包下好。
执行命令

wget https://repo.anaconda.com/miniconda/Miniconda3-py38_23.9.0-0-Linux-x86_64.sh

在这里插入图片描述
下好之后。
安装好之后,执行

bash Miniconda3-py38_23.9.0-0-Linux-x86_64.sh

然后一直yes 就ok
在这里插入图片描述
conda这里就装好了
在这里插入图片描述
也带着python。然后我们创建一个环境

conda create -n airflow  # 创建环境
conda activate airflow   # 激活环境

这样环境就差不多了
安装一下airflow

 conda install apache-airflow

在这里插入图片描述
这里会依赖很多包
在这里插入图片描述
执行airflow命令,出现这些,说明airflow已经装好了。

配置airflow

找到你得airflow路径,
里面会有一个airflow.cfg文件
在这里插入图片描述
这里是配置dags的文件路径
在这里插入图片描述
这里默认使用的是sqlite数据库,
后续也可以改成mysql

https://blog.csdn.net/qq_43439214/article/details/129898191
这个博主的文章很不错,将airflow的数据库换成了mysql

我这里就先用默认的了,
执行命令

airflow db init 

在这里插入图片描述
出现下面内容说明数据库已经初始化好了。
执行下面命令进行创建用户

airflow users create --username zhang --firstname zhang --lastname zhang --role Admin --email  airflow@example.com

并为其用户设置密码。
配置好了之后
我们开一个会话
在这里插入图片描述
来通过webserver开启 web服务,这里在要访问你得公网地址默认端口是8080
在这里插入图片描述
到这里来到了ui界面输入,你刚刚创建的用户名和密码。
在这里插入图片描述
来到这里了。
这样我们的配置基本就没问题了。

举个例子

来举一个例子,
来写一个dags看一下。

from __future__ import annotations
# [START tutorial]
# [START import_module]
import os
import sys
from datetime import datetime, timedelta
from textwrap import dedent
from airflow.operators.python import PythonOperator, BranchPythonOperator


# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator

# [END import_module]
# [START instantiate_dag]


def first_task():
    print("这里是first_task")
    print("这里是first_task")
    print("这里是first_task")
    print("这里是first_task")
    print("这里是first_task")


def second_task():
    print("这里是second_task")
    print("这里是second_task")
    print("这里是second_task")
    print("这里是second_task")
    print("这里是second_task")


with DAG(
    dag_id="test_airflow",
    default_args={
        'owner': "guapisansan",
        "depends_on_past": True,
        "email": ["177664833@qq.com"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
    },
    # [END default_args]
    description="测试一把",
    schedule_interval='0 15 * * *',  # 设置调度间隔为每天下午六点,
    catchup=False,
    start_date=datetime(2023, 10, 23),
    tags=["test"],
) as dag:
    first_task_op = PythonOperator(
        task_id="first_task",
        python_callable=first_task
    )

    second_task_op = PythonOperator(
        task_id="second_task",
        python_callable=second_task
    )

    first_task_op >> second_task_op


我这里给了一段非常简单的代码。
大概意思就是有两个task,分别是first和second 他们对应的也分别是两个python函数
配置了一些参数,如 description dag的简介, 还有一些参数,可以在官方文档查询。

记不记得我们的airflow.cfg文件里面有一个这个参数
在这里插入图片描述
这里就是我们存放所有 dags的路径。一定要是绝对路径
将刚刚写的脚本放到这个路径下,
然后我们在执行命令

airflow dags list

在这里插入图片描述
这里存在我刚刚的创建的test_airflow
这时候我们在创建一个会话
执行命令

airflow scheduler

这是一个调度命令
这时候我们刷新刚刚的web页面
在这里插入图片描述
发现我们dags已经挂到ui页面上了
我们点进去看看
在这里插入图片描述
看到了我们的子任务task 出现了,
现在需要手动触发一下看看结果。
在这里插入图片描述
点击这个trigger按钮
在这里插入图片描述
可以看到这两个task都执行成功了。
右边的框里面有执行时间,还有一些任务的基础信息,
在这里插入图片描述
在这里插入图片描述
点击小图片查看任务执行的日志,
发现我们print打印出来了,
我这里只是举一个小例子先跑起来,可以根据自己的脚本进行配置等等。
airflow是一个非常好用的任务调度工具,相对于数据处理更好,我们当作爬虫管理系统也可以用。
用它将分散的脚本整理,更加方便观察和调度。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1129156.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

手机桌面待办事项APP推荐

每天,我们每个人都面临着繁琐的事务和任务,而手机成了我们日常生活中不可或缺的伙伴。手机上的待办事项工具像一个可靠的助手,可以帮助我们更好地记录、管理和完成任务。在手机桌面上使用的待办事项APP推荐用哪一个呢? 手机是我们…

【QT】信号和槽

一、前置示例代码 main.cpp #include "widget.h"#include <QApplication>int main(int argc, char *argv[]) {QApplication a(argc, argv); // 应用程序对象a&#xff0c;在Qt中&#xff0c;应用程序对象&#xff0c;有且仅有一个。Widget w; // 窗口对…

python sqlalchemy(ORM)- 02 表关系

文章目录 表关系ORM表示 1v1ORM表示 1vm 表关系 1:1&#xff0c;表A 中的一条记录&#xff0c;仅对应表B中的一条记录&#xff1b;表B的一条记录&#xff0c;仅对应表A的一条记录。1:m&#xff0c;表A中的一条记录&#xff0c;对应表B中的多条记录&#xff0c;表B中的一条记录…

编译原理-词法分析器

文章目录 对于词法分析器的要求概念词法分析器的功能和输出形式 词法分析器的设计词法分析器的结构单词符号的识别&#xff1a;超前搜索状态转换图 正规表达式和有限自动机正规式和正规集确定有限自动机&#xff08;DFA&#xff09;非确定有限自动机&#xff08;NFA&#xff09…

多级缓存入门

文章目录 什么是多级缓存JVM进程缓存环境准备安装MySQL导入Demo工程导入商品查询页面 初识Caffeine Lua语法初识Lua第一个lua程序变量和循环Lua的数据类型声明变量循环 条件控制、函数函数条件控制 多级缓存安装OpenRestyOpenResty快速入门反向代理流程OpenResty监听请求编写it…

scrapy的安装和使用

一、scrapy是什么&#xff1a;Scrapy是一个为了爬取网站数据&#xff0c;提取结构性数据而编写的应用框架&#xff0c;可以应用在包括数据挖掘&#xff0c;信息处理或存储历史数据等一系列的程序 二、scrapy的安装&#xff1a;pip install scrapy -i https://pypi.douban.com/…

CURL简单使用

前言 最近做项目&#xff0c;需要服务器实现网络是否通畅&#xff0c;比如通过健康检查接口&#xff0c;但是只能linux服务器测试&#xff0c;很可能还需要测试h2&#xff0c;所以想到了curl&#xff0c;整理一版简单用法。 curl 实际上curl是有官网的&#xff0c;只不过比较…

java中的异常,以及出现异常后的处理【try,catch,finally】

一、异常概念 异常 &#xff1a;指的是程序在执行过程中&#xff0c;出现的非正常的情况&#xff0c;最终会导致JVM的非正常停止。 注意: 在Java等面向对象的编程语言中&#xff0c;异常本身是一个类&#xff0c;产生异常就是创建异常对象并抛出了一个异常对象。Java处理异常的…

C语言-面试题实现有序序列合并

要求&#xff1a; a.输入两个升序排列的序列&#xff0c;将两个序列合并为一个有序序列并输出。 数据范围&#xff1a; 1≤n,m≤1000 1≤n,m≤1000 &#xff0c; 序列中的值满足 0≤val≤30000 输入描述&#xff1a; 1.输入包含三行&#xff0c; 2.第一行包含两个正整数n, m&am…

Modbus协议详解4:RTU帧 ASCII帧的差错校验

前面已经分析过RTU帧和ASCII帧的报文区别&#xff0c;细心的朋友应该会发现在两种不同的报文传输模式下都有一个共同的组成部分——差错校验。 这个差错校验在RTU模式和ASCII模式下也不是不相同的。看下面的对比&#xff1a; RTU模式的差错校验&#xff1a; ASCII模式的差错校验…

对GRUB和initramfs的小探究

竞赛时对操作系统启动过程产生了些疑问&#xff0c;于是问题导向地浅浅探究了下GRUB和initramfs相关机制&#xff0c;相关笔记先放在这里了。 内核启动流程 在传统的BIOS系统中&#xff0c;计算机具体的启动流程如下&#xff1a; 电源启动&#xff1a;当计算机的电源打开时&…

CPU眼里的C/C++:1.2 查看变量和函数在内存中的存储位置

写一个很简单的 c 代码&#xff0c;打印一些“地址”&#xff0c; 也就是变量、函数的“存储位置”&#xff1a;当程序被加载到内存后&#xff0c;它们具体是存在哪里&#xff0c;可以用精确的数值来表示&#xff0c;这就是内存地址。 https://godbolt.org/z/Ghh9ThY5Y #inc…

电解电容寿命与哪些因素有关?

电解电容在各类电源及电子产品中是不可替代的元器件&#xff0c;这些电子产品中由于应用环境的原因&#xff0c;使它成为最脆弱的一环&#xff0c;所以&#xff0c;电解电容的寿命也直接影响了电子产品的使用寿命。 一、电解电容失效模式与因素概述 铝电解电容器正极、负极引出…

Java实现ORM第一个api-FindAll

经过几天的业余开发&#xff0c;今天终于到ORM对业务api本身的实现了&#xff0c;首先实现第一个查询的api 老的C#定义如下 因为Java的泛型不纯&#xff0c;所以无法用只带泛型的方式实现api&#xff0c;对查询类的api做了调整&#xff0c;第一个参数要求传入实体对象 首先…

android——自定义控件(编辑框)、悬浮窗

一、自定义编辑框 效果图&#xff1a; 主要的代码为&#xff1a; class EditLayout JvmOverloads constructor(context: Context, attrs: AttributeSet? null, defStyleAttr: Int 0 ) : ConstraintLayout(context, attrs, defStyleAttr) {private var editTitle: Stringpr…

Android12 启动页适配

印象中&#xff0c;在2022年末接到了一个针对Android12启动页适配的需求&#xff0c;当时也使用了一些适配方案&#xff0c;也写了一个Demo&#xff0c;但是最终没有付诸适配行动&#xff1b;当然并不是适配失败&#xff0c;而是根据官方适配方案适配后太丑了… 1024纪念文章&a…

Java中的CAS简述

目录 1、CAS是什么 2、CAS的生活化例子 3、Java中的atomic包 4、unsafe类 5、CAS的缺点及解决方案 小结 1、CAS是什么 CAS&#xff08;Compare and Swap&#xff09;是一种并发编程中的原子操作&#xff0c;用于实现多线程环境下的无锁同步。它是一种乐观锁的实现方式&a…

分布式限流:Redis

目录 1:如何实现分布式限流 2:限流的几种类别 2.1:固定窗口限流 2.2:滑动窗口限流 2.3:漏桶限流 2.4:令牌桶限流 3:实现分布式限流:Redis 3.1:引入Redisson的依赖包 3.2:初始化Redisson 3.3:创建Redisson的限流类 1:如何实现分布式限流 1:把统计用户的使用频率等这些…

Springcloud介绍

1.基本介绍 Spring Cloud是一系列框架的有序集合。它利用Spring Boot的开发便利性巧妙地简化了分布式系统基础设施的开发&#xff0c;如服务发现注册、配置中心、消息总线、负载均衡、断路器、数据监控等&#xff0c;都可以用Spring Boot的开发风格做到一键启动和部署。Spring …

欧拉图和哈密顿图

欧拉图 在连通图G中&#xff0c;经过G的每条边一次且仅一次的通路&#xff0c;称为欧拉通路若欧拉通路为回路&#xff0c;则称为欧拉回路含有欧拉回路的图称为欧拉图有欧拉通路则G可以一笔画出有欧拉回路则G是连通的且无奇点&#xff08;欧拉图无奇点&#xff09; 哈密顿图 …