python工作任务流flow实时框架:prefect
prefect是一个python的工作任务流调度实时框架,prefect可以快速构建平台系统复杂模块间工作流的监测。当平台系统模块之间的调用链越来越复杂时候,任务执行起来,已经很难盘点清楚程序逻辑和工作流运转的路线图。prefect通过简单的@注解,即可清晰在线、实时的梳理清楚系统运行“思维导图”。
prefect是github的开源项目:
GitHub - PrefectHQ/prefect: The easiest way to coordinate your dataflowThe easiest way to coordinate your dataflow. Contribute to PrefectHQ/prefect development by creating an account on GitHub.https://github.com/PrefectHQ/prefect
安装:
pip install -U prefect
工作任务流例子
@flow可以嵌套使用,@flow里面可以调用@task的任务。但@task不能反过调用@flow,原则上尽量使用@flow,@task是程序代码的最小原子单位。
import time
from prefect import flow, task, get_run_logger
@task(retries=2, retry_delay_seconds=30)
def failure():
print('running')
raise Exception('异常')
@flow(name="my_flow_b", timeout_seconds=15)
def flowb(param: str):
time.sleep(20)
return 'OKb'
@flow(name="my_flow_a")
def flowa(param: str):
time.sleep(15)
failure.submit()
return 'OKa'
@flow(name="my_flow_2", description='第二个工作流')
def flow2():
logger = get_run_logger()
logger.info('启动flow2')
time.sleep(3)
flowb('start b')
return 'OK2'
@flow(name="my_flow_1", description='第一个工作流')
def flow1():
logger = get_run_logger()
logger.info('启动flow1')
time.sleep(2)
flowa('start a')
return 'OK1'
if __name__ == '__main__':
flow1()
flow2()
程序运行后,在控制台使用命令启动prefect服务器端:
prefect orion start
默认prefect启动在http://127.0.0.1:4200,浏览器打开:
查看flow1运行时日志:
flowa中的实时log日志:
flowb的启动参数
prefect的配置管理和设置
可以在控制台查看prefect的配置项:
prefect config view
默认的prefect启动端口在4200,用户通过127.0.0.1:4200访问,可以通过配置设置新的启动端口:
prefect config set PREFECT_ORION_API_PORT=1234
指定prefect启动在1234端口。
然后重启服务器即可:
prefect orion start
prefect的API开发文档
prefect的后端服务器也可以通过REST API访问调用。prefect的文档可以直接查看:
http://127.0.0.1:4200/docs
http://127.0.0.1:4200/redoc
以上效果均在Windows环境下跑出来。prefect有更多的功能,值得研究和使用。