FATE Flow 源码解析 - 日志输出机制

news2025/1/11 11:39:29

背景介绍

在 之前的文章 中介绍了 FATE 的作业处理流程,在实际的使用过程中,为了查找执行中的异常,需要借助运行生成的日志,但是 FATE-Flow 包含的流程比较复杂,对应的日志也很多,而且分散在不同的文件中,在这篇文章中就对 FATE-Flow 的日志机制进行梳理,帮助大家了解 Python 服务中实现一个更加灵活的日志机制

logging 机制

FATE-Flow 中日志主要基于 Python 原生的 logging 机制实现,也正是依靠强大而灵活的 logging 机制,FATE-Flow 才实现了丰富的日志查询能力,基础的 logging 知识可以自行查看官方文档。

为了了解 FATE-Flow 的日志实现机制,涉及的一个关键概念就是 Handler, Handler 是实际处理日志的模块,在 logging 中被设计为可插拔模式的,可以根据需要进行注册。举一个简单的例子进行介绍:

# 创建一个 logger 对象

logger = logging.getLogger(name)

# 注册日志处理器 handler1, 将日志输出至 a.log 文件中

handler1 = TimedRotatingFileHandler("a.log")
logger.addHandler(handler1)

# 注册日志处理器 handler2, 将日志输出至 b.log 文件中

handler2 = TimedRotatingFileHandler("b.log")
logger.addHandler(handler2)

# 实际输出日志

logger.info("test log")

在上面的例子中会基于 logging 机制创建出日志实例 logger ,并注册了两个 Handler,后续输出的日志就会依次被两个 Handler 进行处理,即日志最终会输出至文件 a.logb.log 中,而且在实际运行中可以动态调整 Handler,从而动态调整日志处理方式,因此可以实现极其灵活的日志输出方式

理解了这个机制,就很容易理解 FATE-Flow 如何实现将同样的日志内容按照日志类型输出至 fate_flow_schedule.logfate_flow_audit.log 文件中,同时又按照日志等级输出至 DEBUG.log, INFO.log, WARNING.log, ERROR.log 文件中了。FATE-Flow 就是通过添加不同的 Handler 实现的

FATE-Flow 日志

FATE-Flow 的日志主要存在于 /data/projects/fate/fateflow/logs 目录下,在此目录下主要分为两种类型的日志目录:

  1. fate_flow 目录,其中包含整体的 FATE-Flow 日志;
  2. 独立 job 目录的日志,此时会以 job_id 为单位输出所有相关的日志至特定目录中;
    最终展示的效果如下所示:

请添加图片描述

其中数字部分是以 job 为单位的日志,右侧 fate_flow 就是 FATE-Flow 整体的日志

其中 fate_flow 中主要是如下所示的日志:

  • DEBUG.log
  • ERROR.log
  • INFO.log
  • WARNING.log
  • fate_flow_access.log
  • fate_flow_audit.log
  • fate_flow_database.log
  • fate_flow_detect.log
  • fate_flow_schedule.log
  • fate_flow_stat.log
  • stat.log

注意这部分日志因为量可能可能比较大,因此 FATE-Flow 中是按天进行分片,因此最终的效果如下所示的:

请添加图片描述

而 按照 job_id 为单位的日志主要包含如下所示的日志:

  • fate_flow_audit.log
  • fate_flow_schedule_error.log
  • fate_flow_schedule.log
  • fate_flow_sql.log
  • guest (目录)
  • host (目录)

其中 guest/host 是按照 job 执行中参与的角色为单位输出的日志,后续介绍中再进一步展开

实现原理

下面按照目录分别介绍 FATE-Flow 不同模块的的日志输出相关的实现

fate_flow 目录日志

fate_flow 目录下的日志分为按照类型的日志 fate_flow_schedule.log, fate_flow_audit.log 等,同时也包含按照日志等级的 DEBUG.log, INFO.log, WARNING.log, ERROR.log
事实上,这两种类型的日志中的内容可能有很多相同的内容,只是按照不同维度区分后放在不同的文件中,比如想查询任务调度相关的日志,可以去查询 fate_flow_schedule.log,而想查询执行中警告相关的日志,可以去查询 WARNING.log, 而任务调度中也可能有处于警告级别的日志,此时在两个文件中都可能会存在。

我们可以先类型相关的日志是如何存入的,这边以 fate_flow_schedule.log 为例进行解释:
此日志使用的是 fate_flow/utils/log_utils.py 中的 schedule_logger() 方法进行日志输出,对应输出至 fate_flow 目录相关的代码如下:

def schedule_logger(job_id=None, delete=False):
    if not job_id:
        return getLogger("fate_flow_schedule")

此时调用的是 fate_arch/common/log.py 中的 getLogger() 方法去获取对应日志实例 logger, 最终会调用 LoggerFactory.init_logger() 初始化日志实例,对应的代码如下所示:

def init_logger(class_name):
    # 初始化日志实例 logger,此时对应的 class_name 为 fate_flow_schedule

    logger = LoggerFactory.new_logger(class_name)

    # 创建 logger 对应的 Handler,此 Handler 负责将日志输出至对应文件中

    handler = LoggerFactory.get_handler(class_name)

    # 将 handler 添加至 logger 中

    logger.addHandler(handler)

    LoggerFactory.logger_dict[class_name] = logger, handler

    # 额外配置新 handler,按照等级输出至 `DEBUG.log`, `INFO.log`, `WARNING.log`, `ERROR.log` 就是在这边实现

    LoggerFactory.assemble_global_handler(logger)
    return logger, handler

可以看看 handler 是如何输出至文件中的,对应于 get_handler() 方法,具体实现如下:

def get_handler(class_name, level=None, log_dir=None, log_type=None, job_id=None):
    # 日志对应的文件

    log_file = os.path.join(LoggerFactory.LOG_DIR, "{}.log".format(class_name))

    # 最终使用 TimedRotatingFileHandler 实现按天的日志搜集

    os.makedirs(os.path.dirname(log_file), exist_ok=True)
    if LoggerFactory.log_share:
        handler = ROpenHandler(log_file,
                                when='D',
                                interval=1,
                                backupCount=14,
                                delay=True)
    else:
        handler = TimedRotatingFileHandler(log_file,
                                            when='D',
                                            interval=1,
                                            backupCount=14,
                                            delay=True)

    return handler

可以看到最终就是根据 classname 确定对应的文件路径,并最终创建 TimedRotatingFileHandler,此 Handler 会将日志输出至特定文件中,并按天进行日志轮转,最终实现上面图片中看到的按天分片的日志。

而按照日志等级的输出至文件中的实现是在 LoggerFactory.assemble_global_handler() 中实现的,起始就是依次创建各个日志层级的 Handler,并添加至 logger 上,对应的实现如下所示:

def assemble_global_handler(logger):
    if LoggerFactory.LOG_DIR:
        # 对所有日志层级,依次进行注册

        for level in LoggerFactory.levels:
            if level >= LoggerFactory.LEVEL:

                # 获得的 level_logger_name 分别是 DEBUG, DEBUG, WARNING, ERROR,最终用于输出的文件命名

                level_logger_name = logging._levelToName[level]

                # 创建特定日志层级的 Handler,添加至 logger 上

                logger.addHandler(LoggerFactory.get_global_handler(level_logger_name, level))

而调用的 LoggerFactory.get_global_handler() 事实上也是调用 get_handler() 方法创建 handler,而指定了 level 的情况下 handler 中会额外使用 handler.level = level 仅输出特定日志层级的日志。从而实现了将日志按照层级输出

而其他类型的日志原理都是一样,在 python/fate_flow/settings.py 中初始化了对应类型的日志实例 logger,业务中直接使用,具体代码如下所示:

stat_logger = getLogger("fate_flow_stat")
detect_logger = getLogger("fate_flow_detect")
access_logger = getLogger("fate_flow_access")
database_logger = getLogger("fate_flow_database")
job 日志

job 对应的日志也包含两种,一种是特定的几种类型的日志,比如 fate_flow_audit.log, fate_flow_sql.log, fate_flow_schedule.log,这部分的实现比较简单,具体是在 fate_flow/utils/log_utils.py 中,其中每个类型会包含一个特定的方法。

比如 schedule_logger() 方法用于获得对应日志实例 logger 输出日志至 fate_flow_schedule.log,而 sql_logger() 方法用于获得对应的日志实例 logger 输出日志至 fate_flow_sql.log,这些方法底层都是调用同样的 get_job_logger() 生成对应的日志实例的。具体实现如下所示:

def get_job_logger(job_id, log_type):
    # 主要输出至 fate_flow 目录 和 job_id 相关的目录,包含 job_id 时会以 job_id 为单位分开输出

    job_log_dir = get_fate_flow_directory('logs', job_id)

    # 包含 job_id 会输出至 job_id 目录,audit 应该是请求调用跟踪日志,同时出现在两个 job_id 目录以及 fate_flow 目录

    if log_type == 'audit':
        log_dirs = [job_log_dir, fate_flow_log_dir]
    else:
        log_dirs = [job_log_dir]

    # 创建对应的目录

    os.makedirs(job_log_dir, exist_ok=True)
    os.makedirs(fate_flow_log_dir, exist_ok=True)

    # 生成对应的 logger,并对应匹配创建对应的 handler,从而实现日志输出至特定目录

    logger = LoggerFactory.new_logger(f"{job_id}_{log_type}")

    for job_log_dir in log_dirs:
        # 创建对应的 handler,将日志输出特定文件中,此时文件根据 log_type 命名,此时就是 fate_flow_schedule

        handler = LoggerFactory.get_handler(class_name=None, level=LoggerFactory.LEVEL,
                                            log_dir=job_log_dir, log_type=log_type, job_id=job_id)
        error_handler = LoggerFactory.get_handler(class_name=None, level=logging.ERROR, log_dir=job_log_dir, log_type=log_type, job_id=job_id)
        logger.addHandler(handler)
        logger.addHandler(error_handler)

    return logger

可以看到实现原理也是类似的,就是创建对应日志实例 logger,并生成对应的 handler,从而输出至特定文件中

job 中日志还有一部分的日志对应一个独立的目录 guest/host ,这部分是按照 job 训练中参与的角色为单位输出的日志,这些日志是如何被写入文件中的呢?

对应的代码在 fate_flow/controller/task_controller.py 中的 start_task() 方法,可以看到 task 实际执行的代码如下所示:

backend_engine = build_engine(run_parameters.computing_engine)

# task 执行中对应的日志输出至 get_job_log_directory() 指定的 logs/{job_id}/{role}/{party_id}/{component_name} 对应的目录

run_info = backend_engine.run(task=task,
                                run_parameters=run_parameters,
                                run_parameters_path=run_parameters_path,
                                config_dir=config_dir,
                                log_dir=job_utils.get_job_log_directory(job_id, role, party_id, component_name),
                                cwd_dir=job_utils.get_job_directory(job_id, role, party_id, component_name),
                                user_name=kwargs.get("user_id"))

由于不同的执行引擎代码不同,我们以 eggroll 为例去查看对应的 run() 执行情况,此方法调用 WorkerManager.start_task_worker() 方法,并将日志目录通过 log_dir 传入,代码如下所示:

   def start_task_worker(cls, worker_name, task: Task, task_parameters: RunParameters = None,
                          executable: list = None, extra_env: dict = None, **kwargs):
        # 根据 task 执行情况确定最终日志输出的目录 log_dir, 最终使用 logs/{job_id}/{role}/{party_id}/{component_name} 为日志目录

        worker_id, config_dir, log_dir = cls.get_process_dirs(worker_name=worker_name,
                                                              job_id=task.f_job_id,
                                                              role=task.f_role,
                                                              party_id=task.f_party_id,
                                                              task=task)

        # 实际执行的代码对应的文件

        from fate_flow.worker.task_executor import TaskExecutor
        module_file_path = sys.modules[TaskExecutor.__module__].__file__

        # 实际执行进程,使用 python 执行对应的代码

        env = cls.get_env(task.f_job_id, task.f_provider_info)
        if executable:
            process_cmd = executable
        else:
            process_cmd = [env.get("PYTHON_ENV") or sys.executable or "python3"]

        # 执行命令行,通过 log_dir 指定了对应的日志目录

        common_cmd = [
            module_file_path,
            "--job_id", task.f_job_id,
            "--component_name", task.f_component_name,
            "--task_id", task.f_task_id,
            "--task_version", task.f_task_version,
            "--role", task.f_role,
            "--party_id", task.f_party_id,
            "--config", config_path,
            '--result', result_path,
            "--log_dir", log_dir,
            "--parent_log_dir", os.path.dirname(log_dir),
            "--worker_id", worker_id,
            "--run_ip", RuntimeConfig.JOB_SERVER_HOST,
            "--run_port", RuntimeConfig.HTTP_PORT,
            "--job_server", f"{RuntimeConfig.JOB_SERVER_HOST}:{RuntimeConfig.HTTP_PORT}",
            "--session_id", session_id,
            "--federation_session_id", federation_session_id
        ]
        process_cmd.extend(common_cmd)

        # 启动新进程执行对应的命令

        p = process_utils.run_subprocess(job_id=task.f_job_id, config_dir=config_dir, process_cmd=process_cmd,
                                         added_env=env, log_dir=log_dir, cwd_dir=config_dir, process_name=worker_name.value,
                                         process_id=worker_id)
        return {"run_pid": p.pid, "worker_id": worker_id, "cmd": process_cmd}

而实际使用 log_dir 是在 fate_flow/worker/base_worker.py 中完成的,此时会调用 LoggerFactory.set_directory() 指定日志输出的父目录,后续即可将任务相关的日志输出至指定的目录中了

总结

这边介绍了 FATE-Flow 相关的日志存储情况,相对大部分服务而言,FATE-Flow 中的日志存储都属于比较复杂的了,但是借助 logging 中动态的 Handler 更新机制,可以比较方便地实现不同模块独立的日志文件输出路径,甚至可以实现部分日志输出至日志文件,部分日志输出至终端,有兴趣的也可以进一步了解下

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1932727.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

windows和linux的等保加固测评的经验分享

一头等保加固测评的牛马,需要能做到一下午测评n个服务器 接下来就讲讲如何当一头xxxxxxxxx》严肃的等保测评加固的经验分享( 一、window等保 首先你要自己按着教程在虚拟机做过一遍(win2012和win2008都做过一遍,大概windows的…

抖音短视频seo矩阵系统源码(搭建技术开发分享)

#抖音矩阵系统源码开发 #短视频矩阵系统源码开发 #短视频seo源码开发 一、 抖音短视频seo矩阵系统源码开发,需要掌握以下技术: 网络编程:能够使用Python、Java或其他编程语言进行网络编程,比如使用爬虫技术从抖音平台获取数据。…

How to integrate GPT-4 model hosted on Azure with the gptstudio package

题意:怎样将托管在Azure上的GPT-4模型与gptstudio包集成? 问题背景: I am looking to integrate the OpenAI GPT-4 model into my application. Here are the details I have: Endpoint: https://xxxxxxxxxxxxxxx.openai.azure.com/Locatio…

Gradio从入门到精通(8)---基础组件介绍2

文章目录 一、基础组件详解1.Dropdown2.Image3.Audio4.File 总结 一、基础组件详解 1.Dropdown 用途: 提供下拉菜单选项。 初始化参数: choices: 可选择的选项列表。 value: 默认选中的值。可以是字符串、列表或是一个可调用对象。 type: 组件返回…

【有效验证】解决SQLyog连接MYSQL的错误 1251 - Client does not support

目录 一、原因分析: 二、进入到mysql 三、查看当前加密方式 四、更改加密方式 五、查看是否成功 前言:使用一个开源软件使用sqlyog、navcat都报1251错误,网上都是提示升级客户端,还有一种就是修改mysql配置。本文就是修改配置…

逆向案例二十三——请求头参数加密,某区块链交易逆向

网址:aHR0cHM6Ly93d3cub2tsaW5rLmNvbS96aC1oYW5zL2J0Yy90eC1saXN0L3BhZ2UvNAo 抓包分析,发现请求头有X-Apikey参数加密,其他表单和返回内容没有加密。 直接搜索关键字,X-Apikey,找到疑似加密位置,注意这里…

R语言实现SVM算法——分类与回归

### 11.6 基于支持向量机进行类别预测 ### # 构建数据子集 X <- iris[iris$Species! virginica,2:3] # 自变量&#xff1a;Sepal.Width, Petal.Length y <- iris[iris$Species ! virginica,Species] # 因变量 plot(X,col y,pch as.numeric(y)15,cex 1.5) # 绘制散点图…

【DGL系列】DGLGraph.out_edges简介

转载请注明出处&#xff1a;小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你&#xff0c;欢迎[点赞、收藏、关注]哦~ 目录 函数说明 用法示例 示例 1: 获取所有边的源节点和目标节点 示例 2: 获取特定节点的出边 示例 3: 获取所有边的边ID 示例 4: 获取所有信息&a…

【概率论三】参数估计:点估计(矩估计、极大似然法)、区间估计

文章目录 一. 点估计1. 矩估计法2. 极大似然法2.1. 似然函数2.2. 极大似然估计法 3. 评价估计量的标准3.1. 无偏性3.2. 有效性3.3. 一致性 二. 区间估计1. 区间估计的概念2. 正态总体参数的区间估计 参数估计讲什么 由样本来确定未知参数参数估计分为点估计与区间估计 一. 点估…

Ubuntu20.04从零开搭PX4MavrosGazebo环境并测试

仅仅是个人搭建记录 参考链接&#xff1a; https://zhuanlan.zhihu.com/p/686439920 仿真平台基础配置&#xff08;对应PX4 1.13版&#xff09; 语雀 mkdir -p ~/tzb/catkin_ws/src mkdir -p ~/tzb/catkin_ws/scripts cd catkin_ws && catkin init catkin build cd…

ECMP等价多路由机制,大模型训练负载均衡流量极化冲突原因,万卡(大规模)集群语言模型(LLM)训练流量拥塞特点

大规模集群&#xff0c;大语言模型(LLM)训练流量特点&#xff0c;ECMP&#xff08;Equal-Cost Multi-Path Routing&#xff09;流量极化拥塞原因。 视频分享在这&#xff1a; 2.1 ECMP等价多路由&#xff0c;大模型训练流量特点&#xff0c;拥塞冲突极化产生原因_哔哩哔哩_bi…

【GraphRAG】微软 graphrag 效果实测

GraphRAG 本文将基于以下来源&#xff0c;对Microsoft GraphRAG分析优缺点、以及示例实测分析。 1. Source 代码仓库&#xff1a; Welcome to GraphRAGhttps://microsoft.github.io/graphrag/ 微软文章1&#xff08;2024.2.13&#xff09;&#xff1a;GraphRAG: Unlocking…

电脑系统重装数据被格式化,那些文件还有办法恢复吗?

在日常使用电脑的过程中&#xff0c;系统重装或格式化操作是常见的维护手段&#xff0c;尤其是在遇到系统崩溃、病毒感染或需要升级系统时。然而&#xff0c;这一操作往往伴随着数据丢失的风险&#xff0c;尤其是当C盘&#xff08;系统盘&#xff09;和D盘&#xff08;或其他数…

【linux】信号的理论概述和实操

目录 理论篇 信号概述 信号的分类 信号机制 理解硬件中断 异步 信号对应的三种动作 信号产生的条件 终端按键 系统调用 软件条件 硬件异常 除0错误 野指针 OS对于错误的态度 信号在进程中的内核数据结构 信号的处理 CPU的内核态和用户态概述 进程处理信号的时…

MATLAB科研数据可视化教程

原文链接&#xff1a;MATLAB科研数据可视化https://mp.weixin.qq.com/s?__bizMzUzNTczMDMxMg&mid2247609462&idx3&snf7043936fc5ee42b833c7c9f3bcd24ba&chksmfa826d91cdf5e4872eb275e5319b66ba6927ea0074fb2293fe1ca47d6aedf38ab91050be484c&token1551213…

FPGA 实现DDR4的读写

1 硬件设计 FPGA 端&#xff1a; DDR4: 2 验证方案 3 仿真验证 4 DDR4 下板验证

Qt模型/视图架构——委托(delegate)

一、为什么需要委托 模型&#xff08;model&#xff09;用来数据存储&#xff0c;视图&#xff08;view&#xff09;用来展示数据。因此&#xff0c;模型/视图架构是一种将数据存储和界面展示分离的编程方法。具体如下图所示&#xff1a; 由图可知&#xff0c;模型向视图提供数…

鼠标的发明和鼠标“变形记”

注&#xff1a;机翻&#xff0c;未校对。 Who Invented the Computer Mouse? 谁发明了电脑鼠标&#xff1f; It was technology visionary and inventor Douglas Engelbart (January 30, 1925 – July 2, 2013) who revolutionized the way computers worked, turning it fr…

【unity实战】使用unity制作一个红点系统

前言 注意&#xff0c;本文是本人的学习笔记记录&#xff0c;这里先记录基本的代码&#xff0c;后面用到了再回来进行实现和整理 素材 https://assetstore.unity.com/packages/2d/gui/icons/2d-simple-ui-pack-218050 框架&#xff1a; RedPointSystem.cs using System.…

Jmeter关联

案例脚本实现&#xff1a;选择商品加入购物车 客户端发送一个登录的HTTP请求&#xff0c;服务端返回一个带着token的响应&#xff0c;后续发出一个带token信息的加入购物车的HTTP请求&#xff0c;返回响应。 关联&#xff1a;当请求直接由依赖关系的时候&#xff0c;比如一个请…