您现在的位置是:首页 > 文章详情

基于 Serverless 工作流高并发批量解冻 OSS 文件

日期:2020-03-18点击:532

前言

Serverless 工作流(Serverless Workflow,原函数工作流)是一个用来协调多个分布式任务执行的全托管 Serverless 云服务,致力于简化开发和运行业务流程所需要的任务协调、状态管理以及错误处理等繁琐工作,让用户聚焦业务逻辑开发。用户可以用顺序、分支、并行等方式来编排分布式任务,服务会按照设定好的顺序可靠地协调任务执行,跟踪每个任务的状态转换,并在必要时执行用户定义的重试逻辑,以确保工作流顺利完成。

函数计算 FC 是事件驱动的全托管计算服务,无需采购服务器和运维,只需上传代码就能实现高可用、高并发、弹性伸缩的后端服务。

本文介绍如何使用 Serverless 工作流来高并发大批量的解冻 oss 归档存储文件,使用工作流的优势:

  • 高并发
  • 错误自动重试,高可靠性
  • 每个步骤都有输入输出记录以及实时执行状态,高可观测性

应用中心一键部署

  1. 前往 Serverless 工作流应用中心 创建并部署 OSS Restore 应用。
    image
  2. 部署完成后执行流程 {stackName}-mainRestoreFlow-{suffix},输入:

    { "endpoint": "", "bucketName": "", "prefix": "", "marker": "", "maxKeys": 100, "pollInterval": 10, "workers": 10, "groupSize": 1 }

    执行参数说明:

    • endpoint
      OSS endpoint
    • bucketName:
      OSS bucket 名称
    • prefix:
      OSS bucket 文件过滤前缀
    • maxKeys:
      OSS ListObjects 返回的最大文件数量 (这里不要超过 foreach 的并发限制,默认是 100)
    • pollInterval:
      轮询 OSS 文件 restore 状态的时间间隔(秒)
    • groupSize:
      一个任务步骤(对应一个函数)批量处理的文件数
    • workers:
      多个文件在一个函数中处理时,设定的处理线程池大小
    • marker:
      OSS ListObjects 的开始 marker

    详细信息可参考 ListObjects API

流程执行完毕后,会自动解冻所有符合过滤条件的归档存储文件。

原理

解冻文件主流程

主流程 mainRestoreFlow 主要做以下事情:

  1. 任务步骤 listArchiveFiles 调用 FC 函数 listArchiveFilesmarker 开始执行 ListObjects 列举指定 maxKeys 数量,前缀为 prefix 的 OSS 文件。
  2. 调用子流程 restoreFlow 对获取的文件列表进行解冻,并返回下一次列举的起点 maker
  3. 选择步骤 checkEnd 检测是否已完成列举 bucket 中所有的文件,若没有,跳转到 listArchiveFiles 中从下一个 marker 开始继续执行,否则结束。

主流程定义,请参考 流程定义语言

version: v1 type: flow steps: - type: pass name: init outputMappings: - source: $input.endpoint target: endpoint - source: $input.bucketName target: bucketName - source: $input.prefix target: prefix - source: $input.maxKeys target: maxKeys - source: $input.pollInterval target: pollInterval - source: $input.marker target: marker - source: $input.workers target: workers - source: $input.groupSize target: groupSize # List archive files from marker - type: task name: listArchiveFiles resourceArn: acs:fc:::services/<serviceName>/functions/listArchiveFiles outputMappings: - source: $local.bucketName target: bucketName - source: $local.filesGroup target: filesGroup - source: $local.marker target: marker - source: $local.end target: end - source: $local.empty target: empty - source: $local.archiveFilesCount target: archiveFilesCount # Check whether file restore success, if not, retry check - type: choice name: checkEmpty choices: # If list archive files not empty - condition: $.empty == "false" steps: # Invoke subflow restore to restore listed files - type: task name: invokeRestoreFlow resourceArn: acs:fnf:::flow/<restoreFlow> pattern: sync serviceParams: Input: $ default: goto: checkEnd # Check list files ended - type: choice name: checkEnd choices: - condition: $.end == "true" goto: success default: goto: listArchiveFiles # success - type: pass name: success

解冻文件子流程

子流程 restoreFlow 主要做以下事情:

  1. 并行循环步骤 foreach,并行的对主流程中传入的文件列表生成多个解冻任务。
  2. 解冻任务 restoreTask 调用 FC 函数 restore 对文件列表进行解冻。
  3. 所有解冻任务提交完成后,循环执行:

    • 等待步骤 Wait 等待一段时间
    • 任务步骤 GetJobStatus 获取所有文件的解冻状态
    • 选择步骤 CheckJobComplete 判断是否全部解冻完成,若完成执行结束,否则跳转到 wait 步骤继续循环检测。

子流程定义如下:

version: v1 type: flow steps: - type: foreach name: retoreForeach iterationMapping: collection: $.filesGroup item: files steps: # Invoke restore function - type: task name: restoreTask resourceArn: !Ref OSSRestoreService/restoreFunction retry: - errors: - FC.ResourceThrottled - FC.ResourceExhausted - FC.InternalServerError - FC.Unknown - FnF.TaskTimeout intervalSeconds: 1 maxAttempts: 10 multiplier: 1.5 maxIntervalSeconds: 10 # Wait interval for poll files restore status - type: wait name: Wait duration: $.pollInterval # Get file restore status - type: task name: GetJobStatus resourceArn: !Ref OSSRestoreService/restoreStatusFunction # Check whether file restore success, if not, retry check - type: choice name: CheckJobComplete inputMappings: - target: status source: $local.status choices: - condition: $.status == "success" goto: JobSucceeded - condition: $.status == "running" goto: Wait - type: succeed name: JobSucceeded outputMappings: - target: filesGroup source: $input.filesGroup - target: marker source: $input.marker

以上应用代码可参考 oss-restore

总结

使用 Serverless 工作流能极大减少重复的流程控制开发,让错误重试更加容易,以及提供实时的执行进度查询。这里提供的解冻 oss 应用,完全可以照搬到其它的需求上,以实现高并发高可靠性的应用。

欢迎加钉钉群 23116481 交流:
image

原文链接:https://yq.aliyun.com/articles/750435
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章