ACK One Argo工作流:实现动态 Fan-out/Fan-in 任务编排

news2025/1/11 13:59:21

作者:庄宇

什么是 Fan-out Fan-in

在工作流编排过程中,为了加快大任务处理的效率,可以使用 Fan-out Fan-in 任务编排,将大任务分解成小任务,然后并行运行小任务,最后聚合结果。

图片

由上图,可以使用 DAG(有向无环图)编排 Fan-out Fan-in 任务,子任务的拆分方式分为静态和动态,分别对应静态 DAG 和动态 DAG。动态 DAG Fan-out Fan-in 也可以理解为 MapReduce。每个子任务为 Map,最后聚合结果为 Reduce。

静态 DAG: 拆分的子任务分类是固定的,例如:在数据收集场景中,同时收集数据库 1 和数据库 2 中的数据,最后聚合结果。

动态 DAG: 拆分的子任务分类是动态的,取决于前一个任务的输出结果,例如:在数据处理场景中,任务 A 可以扫描待处理的数据集,为每个子数据集(例如:一个子目录)启动子任务 Bn 处理,当所有子任务 Bn 运行结束后,在子任务 C 中聚合结果,具体启动多少个子任务 B 取决由任务 A 的输出结果。根据实际的业务场景,可以在任务 A 中自定义子任务的拆分规则。

ACK One 分布式工作流 Argo 集群

在实际的业务场景中,为了加快大任务的执行,提升效率,往往需要将一个大任务分解成数千个子任务,为了保证数千个子任务的同时运行,需要调度数万核的 CPU 资源,叠加多任务需要竞争资源,一般 IDC 的离线任务集群难以满足需求。例如:自动驾驶仿真任务,修改算法后的回归测试,需要对所有驾驶场景仿真,每个小驾驶场景的仿真可以由一个子任务运行,开发团队为加快迭代速度,要求所有子场景测试并行执行。

如果您在数据处理,仿真计算和科学计算等场景中,需要使用动态 DAG 的方式编排任务,或者同时需要调度数万核的 CPU 资源加快任务运行,您可以使用阿里云 ACK One 分布式工作流 Argo 集群 [ 1]

ACK One 分布式工作流 Argo 集群,产品化托管 Argo Workflow [ 2] ,提供售后支持,支持动态 DAG Fan-out Fan-in 任务编排,支持按需调度云上算力,利用云上弹性,调度数万核 CPU 资源并行运行大规模子任务,减少运行时间,运行完成后及时回收资源节省成本。支持数据处理,机器学习,仿真计算,科学计算,CICD 等业务场景。

Argo Workflow 是开源 CNCF 毕业项目,聚焦云原生领域下的工作流编排,使用 Kubernetes CRD 编排离线任务和 DAG 工作流,并使用 Kubernetes Pod 在集群中调度运行。

本文介绍使用 Argo Workflow 编排动态 DAG Fan-out Fan-in 任务。

Argo Workflow 编排 Fan-out Fan-in 任务

我们将构建一个动态 DAG Fan-out Fan-in 工作流,读取阿里云 OSS 对象存储中的一个大日志文件,并将其拆分为多个小文件(split),启动多个子任务分别计算每个小文件中的关键词数量(count),最后聚合结果(merge)。

  1. 创建分布式工作流 Argo 集群 [ 3]

  2. 挂载阿里云 OSS 存储卷,工作流可以像操作本地文件一样,操作阿里云 OSS 上的文件。参考:工作流使用存储卷 [ 4]

  3. 使用以下工作流 YAML 创建一个工作流,参考:创建工作流 [ 5] 。具体说明参见注释。

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: dynamic-dag-map-reduce-
spec:
  entrypoint: main
  # claim a OSS PVC, workflow can read/write file in OSS through PVC. 
  volumes:
    - name: workdir
      persistentVolumeClaim:
        claimName: pvc-oss
  # how many tasks to split, default is 5.
  arguments:
    parameters:
      - name: numParts
        value: "5"
  templates:
    - name: main
      # DAG definition.
      dag:
        tasks:
          # split log files to several small files, based on numParts.
          - name: split
            template: split
            arguments:
              parameters:
                - name: numParts
                  value: "{{workflow.parameters.numParts}}"
          # multiple map task to count words in each small file.
          - name: map
            template: map
            arguments:
              parameters:
                - name: partId
                  value: '{{item}}'
            depends: "split"
            # run as a loop, partId from split task json outputs.
            withParam: '{{tasks.split.outputs.result}}'
          - name: reduce
            template: reduce
            arguments:
              parameters:
                - name: numParts
                  value: "{{workflow.parameters.numParts}}"
            depends: "map"
    # The `split` task split the big log file to several small files. Each file has a unique ID (partId).
    # Finally, it dumps a list of partId to stdout as output parameters
    - name: split
      inputs:
        parameters:
          - name: numParts
      container:
        image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
        command: [python]
        args: ["split.py"]
        env:
        - name: NUM_PARTS
          value: "{{inputs.parameters.numParts}}"
        volumeMounts:
        - name: workdir
          mountPath: /mnt/vol
    # One `map` per partID is started. Finds its own "part file" and processes it.
    - name: map
      inputs:
        parameters:
          - name: partId
      container:
        image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
        command: [python]
        args: ["count.py"]
        env:
        - name: PART_ID
          value: "{{inputs.parameters.partId}}"
        volumeMounts:
        - name: workdir
          mountPath: /mnt/vol
    # The `reduce` task takes the "results directory" and returns a single result.
    - name: reduce
      inputs:
        parameters:
          - name: numParts
      container:
        image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
        command: [python]
        args: ["merge.py"]
        env:
        - name: NUM_PARTS
          value: "{{inputs.parameters.numParts}}"
        volumeMounts:
        - name: workdir
          mountPath: /mnt/vol
      outputs:
        artifacts:
          - name: result
            path: /mnt/vol/result.json
  1. 动态 DAG 实现

1)split 任务在拆分大文件后,会在标准输出中输出一个 json 字符串,包含:子任务要处理的 partId,例如:

["0", "1", "2", "3", "4"]

2)map 任务使用 withParam 引用 split 任务的输出,并解析 json 字符串获得所有 {{item}},并使用每个 {{item}} 作为输入参数启动多个 map 任务。

          - name: map
            template: map
            arguments:
              parameters:
                - name: partId
                  value: '{{item}}'
            depends: "split"
            withParam: '{{tasks.split.outputs.result}}'

更多定义方式,请参考开源 Argo Workflow 文档 [ 6]

  1. 工作流运行后,通过分布式工作流 Argo 集群控制台 [ 7] 查看任务 DAG 流程与运行结果。

图片

  1. 阿里云 OSS 文件列表,log-count-data.txt 为输入日志文件,split-output,cout-output 中间结果目录,result.json 为最终结果文件。

图片

  1. 示例中的源代码可以参考:AliyunContainerService GitHub argo-workflow-examples [ 8]

总结

Argo Workflow 是开源 CNCF 毕业项目,聚焦云原生领域下的工作流编排,使用 Kubernetes CRD 编排离线任务和 DAG 工作流,并使用 Kubernetes Pod 在集群中调度运行。

阿里云 ACK One 分布式工作流 Argo 集群,产品化托管 Argo Workflow,提供售后支持,加固控制面实现数万子任务(Pod)稳定高效调度运行,数据面支持无服务器方式调度云上大规模算力,无需运维集群或者节点,支持按需调度云上算力,利用云上弹性,调度数万核 CPU 资源并行运行大规模子任务,减少运行时间,支持数据处理,机器学习,仿真计算,科学计算,CICD 等业务场景。

欢迎加入 ACK One 客户交流钉钉群与我们进行交流。(钉钉群号:35688562

相关链接:

[1] 阿里云 ACK One 分布式工作流 Argo 集群

https://help.aliyun.com/zh/ack/overview-12

[2] Argo Workflow

https://argo-workflows.readthedocs.io/en/latest/

[3] 创建分布式工作流 Argo 集群

https://help.aliyun.com/zh/ack/create-a-workflow-cluster

[4] 工作流使用存储卷

https://help.aliyun.com/zh/ack/use-volumes

[5] 创建工作流

https://help.aliyun.com/zh/ack/create-a-workflow

[6] 开源 Argo Workflow 文档

https://argo-workflows.readthedocs.io/en/latest/walk-through/loops/

[7] 分布式工作流 Argo 集群控制台

https://account.aliyun.com/login/login.htm?oauth_callback=https%3A%2F%2Fcs.console.aliyun.com%2Fone%3Fspm%3Da2c4g.11186623.0.0.7e2f1428OwzMip#/argowf/cluster/detail

[8] AliyunContainerService GitHub argo-workflow-examples

https://github.com/AliyunContainerService/argo-workflow-examples/tree/main/log-count

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1436791.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Vitis】基于C++函数开发组件的步骤

目录 基本步骤 关键领域 • 硬件接口: 任务级并行度: 存储器架构: 微观级别的最优化: 基本步骤 1. 基于 设计原则 建立算法架构。 2. (C 语言仿真) 利用 C/C 语言测试激励文件验证 C/C 代码的逻辑。…

2024/2/6学习记录

ts 因为已经学习过了 js ,下面的都是挑了一些 ts 与 js 不同的地方来记录。 安装 npm install -g typescript 安装好之后,可以看看自己的版本 ts基础语法 模块 函数 变量 语法和表达式 注释 编译 ts 文件需要用 tsc xxx.ts ,js 文件…

Dockerfile文件参数配置和使用

天行健,君子以自强不息;地势坤,君子以厚德载物。 每个人都有惰性,但不断学习是好好生活的根本,共勉! 文章均为学习整理笔记,分享记录为主,如有错误请指正,共同学习进步。…

动态扩缩容下的全局流水号设计

关于全局流水号,业内用的比较多的就是雪花算法,一直没理解在动态扩缩容下其中的workId和 datacenterId如何设置,查到了几个方法:reidis中取,待后期实践下。 先简单的介绍一下雪花算法,雪花算法生成的Id由…

【Zookeeper】what is Zookeeper?

官网地址:https://zookeeper.apache.org/https://zookeeper.apache.org/ 以下来自官网的介绍 ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. A…

AI专题:AI应用落地的商业模式探索

今天分享的是AI 系列深度研究报告:《AI专题:AI应用落地的商业模式探索》。 (报告出品方:国金证券) 报告共计:27页 AI基座模型提供按量收费服务 以 ChatGPT 为代表的大模型能力涌现,为基座模型厂商带来增…

“小手艺”有“大情怀”, 《青春手艺人》赋能乡村振兴,传承新时代文化

文化传承发展要坚持“守正创新”,以守正创新的正气和锐气,赓续历史文脉、谱写当代华章。中央广播电视总台农业农村节目中心推出的聚焦年轻手艺人故事的微纪录片《青春手艺人》,为守正创新的文化传承增添了新的鲜活的青春故事。节目积极响应二…

MATLAB Fundamentals>>>(2/2) Project - Analyze Vehicle Data

#创作灵感# MATLAB基础知识官方课程学习笔记 MATLAB Fundamentals>Common Data Analysis Techniques>Summary of Common Data Analysis Techniques>(2/2) Project - Analyze Vehicle Data 任务名称:Fuel Economy Analysis 任务1: The variabl…

华为机考入门python3--(10)牛客10-字符个数统计

分类:字符 知识点: 字符的ASCII码 ord(char) 题目来自【牛客】 def count_unique_chars(s): # 创建一个空集合来保存不同的字符 unique_chars set() # 遍历字符串中的每个字符 for char in s: # 将字符转换为 ASCII 码并检查是否在范围内 #…

android retrofit上传List集合数据

由于接口需要,retrofit上传不能用POST,因为FormUrlEncoded注解跟Body不能共存,所以更改成了QueryMap 因为需要传参,所先将图片集合转成了Hashmap集合,再使用Gson 将集合转成Json 字符串 ,再转成RequestBody 下面介绍一…

2024年【上海市安全员C3证】考试题库及上海市安全员C3证报名考试

题库来源:安全生产模拟考试一点通公众号小程序 2024年【上海市安全员C3证】考试题库及上海市安全员C3证报名考试,包含上海市安全员C3证考试题库答案和解析及上海市安全员C3证报名考试练习。安全生产模拟考试一点通结合国家上海市安全员C3证考试最新大纲…

微信支付服务商,商户快速进件,减少工作量

大家好,我是小悟 服务商拓展特约商户,人工录入大量商户资料,耗时耗力。商户对标准费率不满意,无法说服商户先签约再帮其调整费率。 为了减少服务商工作量,服务商快速进件工具来了,分为移动端和管理端。用好…

MyBatis多数据源以及动态切换实现(基于SpringBoot 2.7.x)

MyBatis多数据源以及动态切换实现可以实现不同功能模块可以对应到不同的数据库&#xff0c;现在就让我们来讲解一下。 目录 一、引入Maven二、配置文件三、实现多数据源四、动态切换数据源 一、引入Maven 注意&#xff1a;博主这边使用的springboot版本是2.7.14的 <!-- htt…

用 Delphi 程序调用 Python 代码画曲线图

用 Python 的库画图 Python 代码如下&#xff1a; import matplotlib.pyplot as pltsquares [1, 4, 9, 16, 25]; plt.plot(squares); plt.grid(True) # 网格线 plt.show(); # 这句话会弹出个窗口出来&#xff0c;里面是上述数据的曲线。 把以上代码&#xff0c;放进 PyS…

Node.js(五)-跨域(了解)

一 、CORS相关 1. 接口的跨域问题 html: server: 访问结果&#xff1a; 刚才编写的 GET 和 POST接口&#xff0c;存在一个很严重的问题&#xff1a;不支持跨域请求。 解决接口跨域问题的方案主要有两种&#xff1a; ① CORS&#xff08;主流的解决方案&#xff0c;推荐使…

基础面试题整理7之Redis

1.redis持久化RDB、AOF RDB(Redis database) 在当前redis目录下生成一个dump.rdb文件&#xff0c;对redis数据进行备份 常用save、bgsave命令进行数据备份&#xff1a; save命令会阻塞其他redis命令&#xff0c;不会消耗额外的内存&#xff0c;与IO线程同步&#xff1b;bgsav…

gem5学习(17):ARM功耗建模——ARM Power Modelling

目录 一、Dynamic Power States 二、Power Usage Types 三、MathExprPowerModels 四、Extending an existing simulation 五、Stat dump frequency 六、Common Problems 官网教程&#xff1a;gem5: ARM Power Modelling 通过使用gem5中已记录的各种统计数据&#xff0c;…

掌握Web服务器之王:Nginx 学习网站全攻略!

介绍&#xff1a;Nginx是一款高性能的Web服务器&#xff0c;同时也是一个反向代理、负载均衡和HTTP缓存服务器。具体介绍如下&#xff1a; 轻量级设计&#xff1a;Nginx的设计理念是轻量级&#xff0c;这意味着它在占用最少的系统资源的同时提供高效的服务。 高并发能力&#x…

ROS笔记二:launch

目录 launch node标签 参数 参数服务器 节点分组 launch launch文件是一种可以可实现多节点启动和参数配置的xml文件,launch文件用于启动和配置ROS节点、参数和其他相关组件。launch文件通常使用XML格式编写&#xff0c;其主要目的是方便地启动ROS节点和设置节点之间的连…

【Unity优化(一)】音频优化

整理资教程&#xff1a;https://learn.u3d.cn/tutorial/unity-optimization-metaverse 1.音频优化 音频一般不会成为性能瓶颈&#xff0c;是为了节省内存和优化包体大小。 1.0 文件格式和压缩格式 原始音频资源尽量采用WAV格式。 移动平台音频尽量采用Vorbis压缩格式&#x…