ACK One Argo工作流:实现动态 Fan-out/Fan-in 任务编排
作者:庄宇
什么是 Fan-out Fan-in
在工作流编排过程中,为了加快大任务处理的效率,可以使用 Fan-out Fan-in 任务编排,将大任务分解成小任务,然后并行运行小任务,最后聚合结果。
由上图,可以使用 DAG(有向无环图)编排 Fan-out Fan-in 任务,子任务的拆分方式分为静态和动态,分别对应静态 DAG 和动态 DAG。动态 DAG Fan-out Fan-in 也可以理解为 MapReduce。每个子任务为 Map,最后聚合结果为 Reduce。
静态 DAG: 拆分的子任务分类是固定的,例如:在数据收集场景中,同时收集数据库 1 和数据库 2 中的数据,最后聚合结果。
动态 DAG: 拆分的子任务分类是动态的,取决于前一个任务的输出结果,例如:在数据处理场景中,任务 A 可以扫描待处理的数据集,为每个子数据集(例如:一个子目录)启动子任务 Bn 处理,当所有子任务 Bn 运行结束后,在子任务 C 中聚合结果,具体启动多少个子任务 B 取决由任务 A 的输出结果。根据实际的业务场景,可以在任务 A 中自定义子任务的拆分规则。
ACK One 分布式工作流 Argo 集群
在实际的业务场景中,为了加快大任务的执行,提升效率,往往需要将一个大任务分解成数千个子任务,为了保证数千个子任务的同时运行,需要调度数万核的 CPU 资源,叠加多任务需要竞争资源,一般 IDC 的离线任务集群难以满足需求。例如:自动驾驶仿真任务,修改算法后的回归测试,需要对所有驾驶场景仿真,每个小驾驶场景的仿真可以由一个子任务运行,开发团队为加快迭代速度,要求所有子场景测试并行执行。
如果您在数据处理,仿真计算和科学计算等场景中,需要使用动态 DAG 的方式编排任务,或者同时需要调度数万核的 CPU 资源加快任务运行,您可以使用阿里云 ACK One 分布式工作流 Argo 集群 [ 1] 。
ACK One 分布式工作流 Argo 集群,产品化托管 Argo Workflow [ 2] ,提供售后支持,支持动态 DAG Fan-out Fan-in 任务编排,支持按需调度云上算力,利用云上弹性,调度数万核 CPU 资源并行运行大规模子任务,减少运行时间,运行完成后及时回收资源节省成本。支持数据处理,机器学习,仿真计算,科学计算,CICD 等业务场景。
Argo Workflow 是开源 CNCF 毕业项目,聚焦云原生领域下的工作流编排,使用 Kubernetes CRD 编排离线任务和 DAG 工作流,并使用 Kubernetes Pod 在集群中调度运行。
本文介绍使用 Argo Workflow 编排动态 DAG Fan-out Fan-in 任务。
Argo Workflow 编排 Fan-out Fan-in 任务
我们将构建一个动态 DAG Fan-out Fan-in 工作流,读取阿里云 OSS 对象存储中的一个大日志文件,并将其拆分为多个小文件(split),启动多个子任务分别计算每个小文件中的关键词数量(count),最后聚合结果(merge)。
-
创建分布式工作流 Argo 集群 [ 3] 。
-
挂载阿里云 OSS 存储卷,工作流可以像操作本地文件一样,操作阿里云 OSS 上的文件。参考:工作流使用存储卷 [ 4] 。
-
使用以下工作流 YAML 创建一个工作流,参考:创建工作流 [ 5] 。具体说明参见注释。
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dynamic-dag-map-reduce-
spec:
entrypoint: main
# claim a OSS PVC, workflow can read/write file in OSS through PVC.
volumes:
- name: workdir
persistentVolumeClaim:
claimName: pvc-oss
# how many tasks to split, default is 5.
arguments:
parameters:
- name: numParts
value: "5"
templates:
- name: main
# DAG definition.
dag:
tasks:
# split log files to several small files, based on numParts.
- name: split
template: split
arguments:
parameters:
- name: numParts
value: "{{workflow.parameters.numParts}}"
# multiple map task to count words in each small file.
- name: map
template: map
arguments:
parameters:
- name: partId
value: '{{item}}'
depends: "split"
# run as a loop, partId from split task json outputs.
withParam: '{{tasks.split.outputs.result}}'
- name: reduce
template: reduce
arguments:
parameters:
- name: numParts
value: "{{workflow.parameters.numParts}}"
depends: "map"
# The `split` task split the big log file to several small files. Each file has a unique ID (partId).
# Finally, it dumps a list of partId to stdout as output parameters
- name: split
inputs:
parameters:
- name: numParts
container:
image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
command: [python]
args: ["split.py"]
env:
- name: NUM_PARTS
value: "{{inputs.parameters.numParts}}"
volumeMounts:
- name: workdir
mountPath: /mnt/vol
# One `map` per partID is started. Finds its own "part file" and processes it.
- name: map
inputs:
parameters:
- name: partId
container:
image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
command: [python]
args: ["count.py"]
env:
- name: PART_ID
value: "{{inputs.parameters.partId}}"
volumeMounts:
- name: workdir
mountPath: /mnt/vol
# The `reduce` task takes the "results directory" and returns a single result.
- name: reduce
inputs:
parameters:
- name: numParts
container:
image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
command: [python]
args: ["merge.py"]
env:
- name: NUM_PARTS
value: "{{inputs.parameters.numParts}}"
volumeMounts:
- name: workdir
mountPath: /mnt/vol
outputs:
artifacts:
- name: result
path: /mnt/vol/result.json
- 动态 DAG 实现
1)split 任务在拆分大文件后,会在标准输出中输出一个 json 字符串,包含:子任务要处理的 partId,例如:
["0", "1", "2", "3", "4"]
2)map 任务使用 withParam 引用 split 任务的输出,并解析 json 字符串获得所有 {{item}},并使用每个 {{item}} 作为输入参数启动多个 map 任务。
- name: map
template: map
arguments:
parameters:
- name: partId
value: '{{item}}'
depends: "split"
withParam: '{{tasks.split.outputs.result}}'
更多定义方式,请参考开源 Argo Workflow 文档 [ 6] 。
- 工作流运行后,通过分布式工作流 Argo 集群控制台 [ 7] 查看任务 DAG 流程与运行结果。
- 阿里云 OSS 文件列表,log-count-data.txt 为输入日志文件,split-output,cout-output 中间结果目录,result.json 为最终结果文件。
- 示例中的源代码可以参考:AliyunContainerService GitHub argo-workflow-examples [ 8] 。
总结
Argo Workflow 是开源 CNCF 毕业项目,聚焦云原生领域下的工作流编排,使用 Kubernetes CRD 编排离线任务和 DAG 工作流,并使用 Kubernetes Pod 在集群中调度运行。
阿里云 ACK One 分布式工作流 Argo 集群,产品化托管 Argo Workflow,提供售后支持,加固控制面实现数万子任务(Pod)稳定高效调度运行,数据面支持无服务器方式调度云上大规模算力,无需运维集群或者节点,支持按需调度云上算力,利用云上弹性,调度数万核 CPU 资源并行运行大规模子任务,减少运行时间,支持数据处理,机器学习,仿真计算,科学计算,CICD 等业务场景。
欢迎加入 ACK One 客户交流钉钉群与我们进行交流。(钉钉群号:35688562)
相关链接:
[1] 阿里云 ACK One 分布式工作流 Argo 集群
https://help.aliyun.com/zh/ack/overview-12
[2] Argo Workflow
https://argo-workflows.readthedocs.io/en/latest/
[3] 创建分布式工作流 Argo 集群
https://help.aliyun.com/zh/ack/create-a-workflow-cluster
[4] 工作流使用存储卷
https://help.aliyun.com/zh/ack/use-volumes
[5] 创建工作流
https://help.aliyun.com/zh/ack/create-a-workflow
[6] 开源 Argo Workflow 文档
https://argo-workflows.readthedocs.io/en/latest/walk-through/loops/
[7] 分布式工作流 Argo 集群控制台
[8] AliyunContainerService GitHub argo-workflow-examples
https://github.com/AliyunContainerService/argo-workflow-examples/tree/main/log-count
关注公众号
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
Paperless-ngx —— 开源文档管理系统
Paperless-ngx是一个社区支持的开源文档管理系统,可将你的物理文档转换为可搜索的在线档案,从而减少纸张的使用。该软件旨在使“无纸化”变得更容易。 特性 使用标签、通讯录、类型等组织和索引扫描的文档。 对你的文档执行 OCR,将可选择的文本添加到仅图像文档中,并向你的文档添加标签、通讯录和文档类型。 支持 PDF 文档、图像、纯文本文件和 Office 文档(Word、Excel、Powerpoint 和 LibreOffice 等效项)。 Office 文档支持是可选的,由 Apache Tika 提供(参阅配置) 无纸化将你的文档直接存储在磁盘上。文件名和文件夹采用无纸化管理,格式可自由配置。 单页应用程序前端。 包括一个显示基本统计数据并具有文档上传功能的仪表板。 按标签、通讯录、类型等进行过滤。 可以保存自定义视图并将其显示在仪表板上。 全文搜索可帮助你找到所需内容。 自动完成会建议文档中的相关单词。 结果按与你的搜索查询的相关性排序。 突出显示可以显示文档的哪些部分与查询匹配。 搜索类似文档(“更多类似内容”) 电子邮件处理:无纸化添加来自你的电子邮件帐户的文档。 配...
-
下一篇
研发误删的库,凭什么要 DBA 承担责任
镇楼图 三个角色 删库以及更宽泛的数据库变更场景中有三个角色,业务研发,DBA 以及使用的数据库变更工具: 业务研发通常指的是后端研发。国内最主流的技术栈还是 Java,此外 Go 也有一部分,另有全栈的则使用 Node。这些语言通常会配备对应的 ORM 和数据库打交道,Java 的 MyBatis,Go 的 GORM,Node 的 TypeORM 等。 DBA 就是数据库管理员。有些公司即使没有全职 DBA,也会有看着数据库的那个人。 数据库变更工具。公司业务稍微上了规模,一般会选择在专门的数据库变更工具上执行操作,开源的产品里比较主流的有 Archery, Yearning, Bytebase。 生命周期 交代完出场角色,我们再来说一下,数据库变更的整个生命周期: 研发在数据库变更工具上提交了一个变更工单。 工具可能进行一些自动化检测,修改字段会提示锁表,删库,删表会警告破坏应用代码的兼容性。 DBA 进行审核。 审核通过后,进行发布。 告警铺天盖地/客诉蜂拥而来,业务一排查,怀疑可能是之前的数据库变更引起的,拉上 DBA,实锤。于是再一起制定补救方案。 经过几天的奋战,最终修复了...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- MySQL表碎片整理
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Crontab安装和使用
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- SpringBoot2全家桶,快速入门学习开发网站教程
- Docker安装Oracle12C,快速搭建Oracle学习环境




微信收款码
支付宝收款码