通过Job Committer保证Mapreduce/Spark任务数据一致性
并发地向目标存储系统写数据是分布式任务的一个天然特性,通过在节点/进程/线程等级别的并发写数据,充分利用集群的磁盘和网络带宽,实现高容量吞吐。并发写数据的一个主要需要解决的问题就是如何保证数据一致性的问题,具体来说,需要解决下面列出的各个问题:
- 在分布式任务写数据的过程中,如何保证中间数据对外不可见。
- 在分布式任务正常完成后,保证所有的结果数据同时对外可见。
- 在分布式任务失败时,所有结果数据对外不可见且能正确清理。
- 开启预测执行时,保证多个执行相同任务的task只有一份结果数据在最终结果中。
此外,还要一些作业的异常情况需要处理,例如task失败重试,作业重启等等。Job Committer是MapReduce用来实现分布式写入一致性的保证,通过Job Committer的各种实现,保证MapReduce任务在各种异常场景中数据写出的一致性。Spark支持MapReduce的JobCommitter,同样也是通过JobCommitter实现Spark作业写出数据的一致性。
JobCommitter接口
MapReduce有V1和V2两套API接口,在包名中以mapred
和mapreduce
区分,v1和v2版本的JobCommitter抽象接口基本一致,下面以org.apache.hadoop.mapreduce.OutputCommitter
为例介绍主要的接口定义:
Modifier and Type | Method and Description |
---|---|
abstract void | setupJob(JobContext jobContext) For the framework to setup the job output during initialization. |
void | commitJob(JobContext jobContext) For committing job's output after successful job completion. |
void | abortJob(JobContext jobContext, org.apache.hadoop.mapreduce.JobStatus.State state) For aborting an unsuccessful job's output. |
boolean | isCommitJobRepeatable(JobContext jobContext) Returns true if an in-progress job commit can be retried. |
abstract void | setupTask(TaskAttemptContext taskContext) Sets up output for the task. |
abstract void | commitTask(TaskAttemptContext taskContext) To promote the task's temporary output to final output location. |
abstract void | abortTask(TaskAttemptContext taskContext) Discard the task output. |
abstract boolean | needsTaskCommit(TaskAttemptContext taskContext) Check whether task needs a commit. |
boolean | isRecoverySupported(JobContext jobContext) Is task output recovery supported for restarting jobs? If task output recovery is supported, job restart can be done more efficiently. |
void | recoverTask(TaskAttemptContext taskContext) Recover the task output. |
根据接口的调用时机和顺序,我们可以大致梳理出MapReduce任务是如何通过JobCommitter的工作机制。
- 在job初始化时,调用setupJob,进行一些作业级别的初始化工作,例如设置job的工作目录等等。
- 如果已有相同作业正在执行,调用isCommitJobRepeatable判断是否继续。
- 在task初始化时,调用setupTask,进行一些作业级别的初始化工作,例如设置task工作目录,task输出目录等。
- 如果task输出已存在,通过isRecorverySupport判断是否支持recovery,是的话,调用recoverTask,避免task的计算。
- 如果task执行失败,调用abortTask,清理task输出。
- 如果task执行成功,调用commitTask。
- 如果所有task都全部完成,调用commitJob。
- 如果job失败,调用abortJob。
可以看到,JobCommitter的基本机制是基于一种类似于分布式数据库中的两阶段提交协议的方式,task首先commit,主要的工作在task中完成,在appmaster收到所有task成功提交的信息后,进行job commit完成最后的提交工作。通过两阶段提交协议实现数据一致性有两个主要的需求需要满足:
- 在commit job以前,数据对外不可见,且可回退。
- commit job过程要尽量短,最好是原子操作,较长的commit job过程,中间发生失败的风险较大,一旦失败,会导致数据处于某种中间状态,无法满足数据一致性的要求。
在MapReduce中,FileOutputCommitter
是最常使用的一个Job Commiter实现,在写入数据到HDFS上时,完全满足两阶段提交协议的两个要求。
FileOutputCommitter
下面简单介绍FileOutputCommitter
主要接口的一些具体实现细节。FileOutputCommitter
主要涉及到四个目录:
- 最终目录:$dest/
- Job临时目录:$dest/_temporary/$appAttemptId/
- Task临时目录:$dest/_temporary/$appAttemptId/_temporary/$taskAttemptID/
- Task输出目录:$dest/_temporary/$appAttemptId/$taskAttemptID/
整个JobCommitter执行过程如图所示:
- setupJob:设置Job临时目录。
- setupTask:确定Task临时目录和输出目录。
- commitTask:将Task临时目录rename到输出目录。
- abortTask:清理Task临时目录。
- commitJob:将Job临时目录中的数据(包含所有Task输出目录中的文件)合并到Job最终目录。
- abortJob:清理Job临时目录。
根据以上FileOutputCommitter的实现,在可以看到,在commitJob之前,所有mapreduce任务写的数据都在临时目录中,读取Job最终目录不会读到临时数据,在Job执行的任意过程失败,清理临时目录文件即可。FileOutputCommitter在Job执行的过程中,每一个产生的文件需要进行两次Rename操作,第一次是commitTask,在Task中执行,多个节点中执行的task可以并发地进行Rename。第二次是commitJob,MapReduce或者Spark的Job Driver端执行的,是个单点操作。在commitJob时,由于需要将Job临时目录中的文件移动到最终目录,会有一个时间窗口,在过程中失败的话,会导致部分数据对外可见,这个时间窗口随着文件数量的增加也会随之增加。对于HDFS这类分布式文件系统来说,rename是一个十分高效的操作,只涉及到NameNode上相关元数据的修改,所以这个时间窗口非常小,可以满足绝大部分场景的需求。
在对于S3,OSS等公有云上的对象存储系统来说,并不直接支持Rename操作,文件系统级别的Rename操作一般会转换成Copy+Delete操作,这个代价相对于HDFS会大大增加。commitJob是在MapReduce或者Spark的Job Driver端执行的,是个单点操作,虽然有实现线程级别的并发优化,但是在写入S3/OSS的场景中,commitJob的时间窗口会非常长,文件数量较大时,可能达到分钟,甚至小时级别,这对于Job的性能会产生严重的影响,为了解决写S3/OSS等对象存储系统的性能问题,Hadoop社区引入了FileOutputCommitter V2版本。
FileOutputCommitter V2
FileOutputCommitter V2版本整个job commit的过程如下:
- setupJob:设置Job临时目录。
- setupTask:确定Task临时目录。
- commitTask:将Task临时目录文件rename到Job最终目录。
- abortTask:清理Task临时目录。
- commitJob:无需Rename操作。
- abortJob:清理Job临时目录。
可以看到在V2版本中,最大的区别是去掉了Task输出目录,在commitTask的时候将文件直接rename到Job最终目录,整个Job Commit过程,对于所有的文件只需进行一次Rename操作,而且Rename操作是在集群节点的所有task上并发执行的,消除了Job Driver单点执行rename的瓶颈。
FileOutputCommitter V2在写入数据到S3/OSS等场景中大大提高了性能,但是由于byPass了Task输出目录,无法保证数据的一致性,在Job执行过程中,部分文件就移动到了Job最终目录。当部分task成功,部分task失败时,也会在最终目录中残留中间文件。
针对写入S3/OSS等的场景,Hadoop社区和各个工业界也都提出了非常多的解决方案,基本的目标是保证数据一致性的前提下,完全避免Rename操作。下面主要介绍S3ACommitter和JindoOssCommitter,分别是hadoop社区和阿里云EMR团队针对S3和OSS实现的Job Committer,主要是基于S3/OSS的Multipart Upload特性实现,基本思想一致,在这里一并介绍。此外,还有Databricks基于DBIO的方案,Netflix的Staging committer方案等等,篇幅有限,这里就不过多介绍了。
对象存储系统的Multipart Upload
除了通过PUT Object接口上传文件到S3/OSS以外,S3/OSS还提供了另外一种上传模式——Multipart Upload。主要应用在文件较大,需要断点上传或者网络不好等场景中,以OSS为例,Multipart Upload上传的流程如下:
- InitiateMultipartUpload:使用Multipart Upload模式传输数据前,必须先调用该接口来通知OSS初始化一个Multipart Upload事件。指定目标文件地址作为参数,获取一个uploadId用作后续upload使用。
- UploadPart:初始化一个MultipartUpload之后,可以根据指定的Object名和Upload ID来分块(Part)上传数据。可重复调用uploadPart接口上传不同的分块数据,而且可以并发调用。
- CompleteMultipartUpload:在将所有数据Part都上传完成后,必须调用CompleteMultipartUpload接口来完成整个文件的MultipartUpload。完成completeMultipartUpload后,文件在oss上对外可见,在completeMultipartUpload返回之前,该文件对外不可见。
- AbortMultipartUpload:AbortMultipartUpload接口用于终止MultipartUpload事件,在CompleteMultipartUpload之前可随时中止MultipartUpload。
- ListMultipartUploads:ListMultipartUploads用来列举所有执行中的Multipart Upload事件,即已经初始化但还未Complete或者Abort的Multipart Upload事件。
基于Multipart Upload的No-Rename Committer实现
通过Multipart Upload功能提供的支持,结合S3/Oss文件系统层面的定制支持,可以实现在保证数据一致性前提下无需Rename操作的Job Committer实现,具体的Job Commit流程如下:
- setupJob:设置Job临时目录。
- setupTask:设置Task临时目录,Task执行过程中写文件使用MultiUpload接口直接写到Job最终目录,在close文件时,不调用CompleteMultipartUpload接口,将所有Upload分块信息记录在Task临时目录的文件中。
- commitTask:将Task临时目录文件中的多个文件Upload分块信息合并成一个文件,写到Job临时目录。
- abortTask:清理Task临时目录,使用AbortMultipartUpload接口,abort所有该task写的文件。
- commitJob:访问Job临时目录中所有的Upload分块信息,调用CompleteMultipartUpload接口,完成所有文件的MultipartUpload。
- abortJob:调用ListMultipartUploads,abort所有该task写的文件分块,清理Job临时目录。
在Task执行过程中,由于通过Multipart Upload相关接口初始化upload和上传分块数据,但是知道commitJob时,才会调用CompleteMultipartUpload。根据Multipart Upload特性,在调用CompleteMultipartUpload前文件是不可见的,从而保证了数据一致性。同FileOutputCommitter类似,由于有多个文件需要CompleteMultipartUpload,在commitJob时也会有一个可能导致数据不一致的时间窗口。文件的上传过程都已经在task中分布式的完成了,在Job Driver中commitJob时CompleteMultipartUpload是一个非常轻量级的请求,所以这个时间窗口会非常短,失败的可能较低,可以满足绝大部分业务场景的需求。对比FileOutputCommitter V1,在jobCommit时,CompleteMultipartUpload相对于Rename代价小很多,可能导致数据不一致的时间窗口也会少很多。对比FileOutputCommitter V2,V2并不保证数据一致性,JindoOssCommitter可以适用于更多对数据一致性有要求的场景。
性能方面,这种方式分布式的在task中并发写数据到OSS中,并且不需要Rename操作,对比FileOutputCommitter V1/V2分别需要的两次和一次Rename操作,也有大幅的性能提升。
总结
通过对象存储系统普遍提供的Multipart Upload功能,实现的No-Rename Committer在数据一致性和性能方面相对于FileOutputCommitter V1/V2版本均有较大提升,在使用MapRedcue和Spark写入数据到S3/Oss的场景中更加推荐使用。S3ACommitter在Hadoop社区版本的3.1.2中已经可以使用,JindoOssCommitter也在阿里云的EMR环境2.5.0以上版本中默认开启。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
免费试用Serverless容器3个月(新用户免费领100元代金劵)
免费使用Serverless容器服务 在容器化和Kubernetes为中心的云原生行业大潮下,小伙伴们不再纠结应用是否需要容器化,而是思考着在云上如何更好的管理自己的容器应用,以及如何更好的提升弹性,降低计算成本,和加速应用开发的敏捷性。 阿里云容器服务团队联合弹性计算团队近日推出了面向新用户使用Serverless容器三个月的免费活动,无需购买ECS服务器,开发者就可以使用100元代金劵快速部署自己的web服务、执行Job任务或者简单的运行docker镜像,体验serverless容器的简单、便捷、弹性和低成本。 云上容器使用之旅 相信很多开发者在应用容器化过程中都经历过多个阶段,最初是在本地开发环境中学习docker,比如docker pull/push/run等基础命令,这时我们已经初步认识到容器的优势,build once, run everywhere。然后我们尝试把容器化的应用搬到云上,此时可以选择购买ECS服务器,在ECS中部署docker启动容器,这样一个基础的容器化过程就完成了,然而在单个ECS之中运行容器应用是不够的,需要解决很多生产级别的架构问题,比如ECS宕机后...
- 下一篇
浅谈企业数据目录
最近又遇到一个数据相关的咨询项目,为一家企业整理数据服务目录,今天就来讨论下企业数据目录架构和它的部署方式。 企业数据目录(EDC)旨在帮助企业与IT人员通过统一的元数据视图(包括技术元数据、业务元数据、用户释义、关联关系、数据质量和用途)来释放企业数据资产的最大能量。 我们从下至上来看下EDC的一个架构,最下面是存储层,在这一层,EDC包含了传统的结构化数据库用来存储EDC的管理员数据、可视化配置数据、数据域的规则,runtime统计数据等等,其中一部分结构化数据来自于各接入应用的元数据,称为模型库服务(Model Repository Service,MRS)使所有接入的应用可以在一个关系型数据库中进行协同;另一部分结构化数据称为数据剖析仓库(Profiling Warehouse,PWH),用来存储数据剖析信息,例如剖析结果和计分卡结果。在存储层EDC也可以接入各种非结构化数据,例如Hadoop分布式存储系统以及其上的HBASE等开源产品。 往上一层,对于接入的结构化数据的数据源,有剖析引擎(Data Profiling Engine)对数据集的唯一性,特征值频率以及数据集所属的...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS关闭SELinux安全模块
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- CentOS7安装Docker,走上虚拟化容器引擎之路
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS8编译安装MySQL8.0.19
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Hadoop3单机部署,实现最简伪集群