初探大数据处理 on Kubernetes
背景
自2003年Google的三大核心技术GFS(03)、MapReduce(04)、和BigTable(06)的论文陆续发表至今,以Hadoop为代表的大数据处理框架,开始登上历史的舞台,迎来了一个黄金时代。Apache Hadoop是其中最为成功的开源项目,让企业级的大数据处理能力变得唾手可得。围绕Hadoop的学术研究和工业界的探索在过去的十多年里一直保持着火热。
而在另一条时间线上,容器技术在Docker问世后,终于等来了快速发展的6年。与此同时,Kubernetes作为容器编排的开源系统,在过去几年经过一番混战,并借助CNCF社区的推动以及云原生的兴起,也很快成为了业界容器编排的事实标准。如今,几乎所有的云厂商都有一套围绕Kubernetes的容器生态,例如我们阿里云就有ACK、ASK(Serverless Kubernetes)、EDAS、以及ECI(阿里云弹性容器实例)。
ASF (Apache Software Foundation) 和CNCF(Cloud Native Computing Foundation),两大相对独立的阵营悄然步入到了一个历史的拐点,我们都期待他们之间会碰撞出怎样的火花。显然,Spark2.3.0 开始尝试原生支持on Kubernetes就是一个重要的时间节点。本文就是主要分享最近调研Spark on Kubernetes的一些总结。
从Hadoop说起
Hadoop主要包含以下两个部分:Hadoop Distributed File System (HDFS) 和一个分布式计算引擎,该引擎就是Google的 MapReduce思想的一个实现 。Hadoop一度成为了大规模分布式数据存储和处理的标椎。
Hadoop to Spark
Hadoop在被业界广泛使用的同时,也一直存在很多的问题:
1、只支持Map和Reduce算子,复杂的算法、业务逻辑很难表达,最终只能将逻辑写入算子里面,除了代码不宜维护,还导致调度上没有任何优化空间,只能根据任务数单一纬度来调度。
2、计算的中间结果也要存入HDFS,不必要的IO开销。
3、 TaskTracker 将资源划分为map slot和reduce slot,不够灵活,当缺少某个stage的时候会严重降低资源利用率。
4、…
关于Hadoop的研究也基本是围绕资源调度、MapReduce计算模式、HDFS存储、以及通用性等方面的优化,Spark便是众多衍生系统中最成功的一个。甚至可以说是里程碑级别的,从此关于Hadoop的研究沉寂了很多。2009年由加州大学伯克利分校的AMPLab开发的Spark问世,便很快成为Apache的顶级开源项目。Apache Spark 是一个基于内存计算、支持远比MapReduce复杂算子、涵盖批流等多种场景的大数据处理框架。
梳理下Spark中一些主要的概念:
- Application:Spark Application的概念和Hadoop中的 MapReduce类似,指的是用户编写的 Spark 应用程序,相比于Hadoop支持更丰富的算子,而且利用内建的各种库可以很方便开发机器学习、图计算等领域的应用。
- Job:由大量的Task组成的并行计算作业,一个作业通常包含一批RDD及作用于相应RDD上的各种算子。
- Stage:每个作业都会被拆分成很多组Task,每组Task即为一个TaskSet,也被称为Stage,一个作业分为多个Stage。
- Task: 被指定到某个Executor上的执行的任务,Task可以理解为一段逻辑,等待被调度到Excutor的某个线程中执行。
- Operations:即算子,分为1)Action,比如:reduce、collect、count等;2)Transformation,比如:map、join、reduceByKey等。Action会将整个作业切割成多个Stage。
- Executor:Application运行在Worker节点上的一个进程,该进程负责运行Task,每个Application都有各自的一批Executor。Executor的数量可以静态设定好,也可以采用动态资源分配。
- Driver:Spark中的Driver根据提交的Application创建SparkContext,即准备程序的运行环境。SparkContext负责和ClusterManager通信,进行资源的申请、任务的分配等;当所有Executor全部执行完毕后,Driver负责将SparkContext关闭。
- Worker:集群中任何可以运行Application任务的节点。
- Cluster Manager:集群中调度资源的服务。Standalone模式下为Master;Yarn模式下为Yarn中的ResourceManager。
Hadoop to YARN
早期的Hadoop大规模集群也可以达到几千个节点,当数据处理需求不断增长的时候,粗暴的增加节点已经让原生调度系统非常吃力。Application管理和Resource管理的逻辑全部放在Hadoop的 JobTracker中,而 JobTracker又不具备横向扩展的能力,这让JobTracker不负重堪。需要一套方案能将Application管理和Resource管理职责分开,能将计算模式和 JobTracker解耦,YARN就是在这样的背景下诞生的。如今我们常听到的Hadoop其实已经是指Yarn了。
Spark调度在最初设计的时候,就是开放式的,而且调度模块之间的关系跟YARN的概念非常吻合。
Spark Master和ResourceManager对应,Spark Worker和NodeManager对应,Spark Driver和Application Master对应,Spark Executor和Container对应。每个Executor能并行运行Task的数量就取决于分配给它的Container的CPU核数。
Client提交一个应用给 Yarn ResourceManager后, Application Manager接受请求并找到一个Container创建该应用对应的Application Master,Application Master会向ResourceManager注册自己,以便client访问。Application Master上运行的就是Spark Driver。Application Master申请 Container并启动,Spark Driver然后在Container里启动 Spark Executor,并调度Spark Task到Spark Executor上的线程执行。等到所有的Task执行完毕后,Application Master取消注册并释放资源。
带来的好处
1、YARN作为集群统一的资源调度和应用管理层,降低了资源管理的复杂性的同时,对所有应用类型都是开放的,即支持混部MapReduce、Spark等,能提高整个集群的资源利用率。
2、两级调度方式,大大降低了ResourceManager的压力,增加了集群的扩展能力。
3、计算模式和资源调度解耦。在调度层,屏蔽了MapReduce、Spark、Flink等框架的计算模式的差异,让这些框架都只用专注于计算性能的优化。
4、可以使用YARN的高级功能,比如:1)原生FIFO之外的调度策略: CapacityScheduler & FairScheduler;2)基于队列的资源隔离与分配等。
YARN to Kubernetes
Hadoop和Spark能成为现在使用最广泛的大数据处理框架,离不开Yarn的强大。虽然也有人诟病它的悲观锁导致并发粒度小、二层调度资源可见性等问题,但是除此之外,Yarn就本身来说并没有什么大的缺陷,依然是大数据领域的调度底座的首选。历史往往就是如此,霸主都不是被对手干到,而是被那些一开始看似其他领域的新兴力量淘汰。这就是如今谷歌主导的kubernetes生态发展到一定的程度之后,Yarn必然要去面对的挑战:如果未来,一家公司80%的业务都已经统一在Kubernetes上跑,它还会原意为剩下的20%的大数据的业务单独维护一个Yarn集群么?
Kubernetes的优势
Spark on kubernetes相比于on YARN等传统部署方式的优势:
1、统一的资源管理。不论是什么类型的作业都可以在一个统一kubernetes的集群运行。不再需要单独为大数据作业维护一个独立的YARN集群。
2、弹性的集群基础设施。资源层和应用层提供了丰富的弹性策略,我们可以根据应用负载需求选择 ECS 虚拟机、神龙裸金属和 GPU 实例进行扩容,除了kubernetes集群本生具备的强大的扩缩容能力,还可以对接生态,比如virtual kubelet。
3、轻松实现复杂的分布式应用的资源隔离和限制,从YRAN复杂的队列管理和队列分配中解脱。
4、容器化的优势。每个应用都可以通过docker镜像打包自己的依赖,运行在独立的环境,甚至包括Spark的版本,所有的应用之间都是隔离的。
5、大数据上云。目前大数据应用上云常见的方式有两种:1)用ECS自建YARN(不限于YARN)集群;2)购买EMR服务。如今多了一个选择——Kubernetes。既能获得完全的集群级别的掌控,又能从复杂的集群管理、运维中解脱,还能享受云所带来的弹性和成本优势。
Spark自2.3.0开始试验性支持Standalone、on YARN以及on Mesos之外的新的部署方式:Running Spark on Kubernetes ,并在后续的发行版中不断地加强。
后文将是实际的操作,分别让Spark应用在普通的Kubernetes集群、Serverless Kubernetes集群、以及Kubernetes + virtual kubelet等三种场景中部署并运行。
Spark on Kubernetes
准备数据以及Spark应用镜像
参考:
创建kubernetes集群
如果已经有阿里云的ACK集群,该步可以忽略。
具体的创建流程参考:创建Kubernetes 托管版集群。
提交作业
为Spark创建一个RBAC的role
创建账号(默认namespace)
kubectl create serviceaccount spark
绑定角色
kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=default:spark --namespace=default
直接使用spark-submit提交(不推荐的提交方式)
liumihustdeMacBook-Pro:spark-on-k8s liumihust$ ./spark-2.3.0-bin-hadoop2.6/bin/spark-submit --master k8s://121.199.47.XX:6443 --deploy-mode cluster --name WordCount --class com.aliyun.liumi.spark.example.WordCount --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.executor.instances=2 --conf spark.kubernetes.container.image=registry.cn-beijing.aliyuncs.com/liumi/spark:2.4.4-example local:///opt/spark/jars/SparkExampleJava-1.0-SNAPSHOT.jar
参数解释
--master :k8s集群的apiserver,这是决定spark是在k8s集群跑,还是在yarn上跑。
--deploy-mode:driver可以部署在集群的master节点(client)也可以在非master(cluster)节点。
spark.executor.instances: executor的数量
spark.kubernetes.container.image spark打包镜像(包含driver、excutor、应用,也支持单独配置)
提交基本流程
- Spark先在k8s集群中创建Spark Driver(pod)。
- Driver起来后,调用k8s API创建Executors(pods),Executors才是执行作业的载体。
- 作业计算结束,Executor Pods会被自动回收,Driver Pod处于Completed状态(终态)。可以供用户查看日志等。
- Driver Pod只能被用户手动清理,或者被k8s GC回收。
结果分析
执行过程中的截图如下:
我们30G的数据用2个1C1G的Excutor处理了大约20分钟。
作业运行结束后查看结果:
[root@liumi-hdfs ~]# $HADOOP_HOME/bin/hadoop fs -cat /pod/data/A-Game-of-Thrones-Result/* (142400000,the) (78400000,and) (77120000,) (62200000,to) (56690000,of) (56120000,a) (43540000,his) (35160000,was) (30480000,he) (29060000,in) (26640000,had) (26200000,her) (23050000,as) (22210000,with) (20450000,The) (19260000,you) (18300000,I) (17510000,she) (16960000,that) (16450000,He) (16090000,not) (15980000,it) (15080000,at) (14710000,for) (14410000,on) (12660000,but) (12470000,him) (12070000,is) (11240000,from) (10300000,my) (10280000,have) (10010000,were)
至此,已经能在kubernetes集群部署并运行spark作业。
Spark on Serverless Kubernetes
Serverless Kubernetes (ASK) 相比于普通的kubernetes集群,比较大的一个优势是,提交作业前无需提前预留任何资源,无需关心集群的扩缩容,所有资源都是随作业提交自动开始申请,作业执行结束后自动释放。作业执行完后就只剩一个SparkApplication和终态的Driver pod(只保留管控数据)。原理图如下图所示:
ASK通过virtual kubelet调度pod到阿里云弹性容器实例。虽然架构上跟ACK有明显的差异,但是两者都是全面兼容kubernetes标准的。所以on ASK跟前面的spark on kubernetes准备阶段的基本是一致的,即HDFS数据准备,spark base镜像的准备、spark应用镜像的准备等。主要就是作业提交方式稍有不同,以及一些额外的基本环境配置。
创建serverless kubernetes集群
选择标准serverless集群:
基本参数:
1、自定义集群名。
2、选择地域、以及可用区。
3、专有网络可以用已有的也可以由容器服务自动创建的。
4、是否公网暴露API server,如有需求建议开启。
5、开启privatezone,必须开启。
6、日志收集,建议开启。
注:
1、提交之前一定要升级集群的集群的virtual kubelet的版本(新建的集群可以忽略),只有目前最新版的VK才能跑Spark作业。
2、ASK集群依赖privatezone做服务发现,所以集群不需要开启privatezone,创建的时候需要勾选。如果创建的时候没有勾选,需要联系我们帮开启。不然Spark excutor会找不到driver service。
*制作镜像cache
由于后面可能要进行大规模启动,为了提高容器启动速度,提前将Spark应用的镜像缓存到ECI本地,采用k8s标准的CRD的方式,具体的流程参考:使用CRD加速创建Pod
提交:
由于spark submit目前支持的参数非常有限,所以ASK场景中不建议使用spark submit直接提交,而是使用Spark Operator 。在Spark Operator出现之前,也可以采用kubernetes原生的yaml方式提交。后面会分别介绍这两种不同的方式。
方式一:原生的方式,编写yaml
编写自定义的标准的kubernetes yaml创建资源。
我所测试的完整的yaml文件如下(基于Spark 2.3.0):
wordcount-spark-driver-svc.yaml:
apiVersion: v1 kind: ServiceAccount metadata: name: spark-serverless namespace: default --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: name: spark-serverless-role roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole name: edit subjects: - kind: ServiceAccount name: spark-serverless namespace: default --- apiVersion: v1 kind: Service metadata: name: wordcount-spark-driver-svc namespace: default annotations: service.beta.kubernetes.io/alibaba-cloud-private-zone-enable: "true" spec: clusterIP: None ports: - name: driver-rpc-port port: 7078 protocol: TCP targetPort: 7078 - name: blockmanager port: 7079 protocol: TCP targetPort: 7079 selector: spark-app-selector: spark-9b7952456a86413b94c70fe2b3f8496c spark-role: driver sessionAffinity: None type: ClusterIP --- apiVersion: v1 kind: Pod metadata: annotations: spark-app-name: WordCount k8s.aliyun.com/eci-image-cache: "true" labels: spark-app-selector: spark-9b7952456a86413b94c70fe2b3f8496c spark-role: driver name: wordcount-spark-driver namespace: default spec: containers: - args: - driver env: - name: SPARK_DRIVER_MEMORY value: 1g - name: SPARK_DRIVER_CLASS value: com.aliyun.liumi.spark.example.WordCount - name: SPARK_DRIVER_ARGS - name: SPARK_DRIVER_BIND_ADDRESS valueFrom: fieldRef: apiVersion: v1 fieldPath: status.podIP - name: SPARK_MOUNTED_CLASSPATH value: >- /opt/spark/jars/SparkExampleJava-1.0-SNAPSHOT.jar:/opt/spark/jars/SparkExampleJava-1.0-SNAPSHOT.jar - name: SPARK_JAVA_OPT_0 value: '-Dspark.submit.deployMode=cluster' - name: SPARK_JAVA_OPT_1 value: '-Dspark.driver.blockManager.port=7079' - name: SPARK_JAVA_OPT_2 value: '-Dspark.master=k8s://https://47.99.132.xxx:6443' - name: SPARK_JAVA_OPT_3 value: '-Dspark.app.id=spark-9b7952456a86413b94c70fe2b3f8496c' - name: SPARK_JAVA_OPT_4 value: '-Dspark.kubernetes.authenticate.driver.serviceAccountName=spark' - name: SPARK_JAVA_OPT_5 value: >- -Dspark.kubernetes.driver.pod.name=wordcount-spark-driver - name: SPARK_JAVA_OPT_6 value: '-Dspark.app.name=WordCount' - name: SPARK_JAVA_OPT_7 value: >- -Dspark.kubernetes.container.image=registry.cn-beijing.aliyuncs.com/liumi/spark:2.3.0-hdfs-1.0 - name: SPARK_JAVA_OPT_8 value: '-Dspark.executor.instances=10' - name: SPARK_JAVA_OPT_9 value: >- -Dspark.jars=/opt/spark/jars/SparkExampleJava-1.0-SNAPSHOT.jar,/opt/spark/jars/SparkExampleJava-1.0-SNAPSHOT.jar - name: SPARK_JAVA_OPT_10 value: >- -Dspark.driver.host=wordcount-spark-driver-svc.default.svc.cluster.local.c132a4a4826814d579c14bf2c5cf933af - name: SPARK_JAVA_OPT_11 value: >- -Dspark.kubernetes.executor.podNamePrefix=wordcount-spark - name: SPARK_JAVA_OPT_12 value: '-Dspark.driver.port=7078' - name: SPARK_JAVA_OPT_13 value: >- -Dspark.kubernetes.executor.annotation.k8s.aliyun.com/eci-image-cache=true - name: SPARK_JAVA_OPT_14 value: >- -Dspark.kubernetes.allocation.batch.size=10 image: 'registry.cn-beijing.aliyuncs.com/liumi/spark:2.3.0-hdfs-1.0' imagePullPolicy: IfNotPresent name: spark-kubernetes-driver resources: limits: memory: 16384Mi requests: cpu: '8' memory: 16Gi terminationMessagePath: /dev/termination-log terminationMessagePolicy: File dnsPolicy: None dnsConfig: nameservers: - 100.100.2.136 - 100.100.2.138 searches: - default.svc.cluster.local.c132a4a4826814d579c14bf2c5cf933af - svc.cluster.local.c132a4a4826814d579c14bf2c5cf933af - cluster.local.c132a4a4826814d579c14bf2c5cf933af - c132a4a4826814d579c14bf2c5cf933af options: - name: ndots value: "5" hostAliases: - ip: "47.99.132.xxx" hostnames: - "kubernetes.default.svc" priority: 0 restartPolicy: Never serviceAccount: spark-serverless serviceAccountName: spark-serverless terminationGracePeriodSeconds: 30 tolerations: - effect: NoExecute key: node.kubernetes.io/not-ready operator: Exists tolerationSeconds: 300 - effect: NoExecute key: node.kubernetes.io/unreachable operator: Exists tolerationSeconds: 300
yaml文件里定义了四个资源:
ServiceAccount:spark-serverless,Driver需要在pod里面访问集群的api server,所以需要创建一个ServiceAccount。不用每次提交都创建。
ClusterRoleBinding:spark-serverless-role,将RBAC的role绑定到这个ServiceAccount,赋予操作资源的权限。不用每次提交都创建。
Service:Driver service,暴露Driver pod。Excutor 就是通过这个service访问Driver的。
Pod:Driver pod,不用定义Excutor pod yaml,Excutor pod的参数通过Driver的环境变量来设置Dspark.kubernetes.*实现。
kubectl 提交:
liumihustdeMacBook-Pro:spark-on-k8s liumihust$ kubectl create -f wordcount-spark-driver-svc.yaml serviceaccount/spark-serverless created clusterrolebinding.rbac.authorization.k8s.io/spark-serverless-role created service/wordcount-spark-driver-svc created pod/wordcount-spark-driver created
方式二:Spark Operator
前面直接通过k8s yaml申明的方式,也能直接利用kubernetes的原生调度来跑Spark的作业,在任何集群只要稍加修改就可以用,但问题是:1)不好维护,涉及的自定义参数比较多,且不够直观(尤其对于只熟悉Spark的用户);2)没有了Spark Application的概念了,都是裸的pod和service,当应用多的时候,维护成本就上来了,缺少统一管理的机制。
Spark Operator 就是为了解决在Kubernetes集群部署并维护Spark应用而开发的,Spark Operator是经典的CRD + Controller,即Kubernetes Operator的实现。Kubernetes Operator诞生的故事也很具有传奇色彩,有兴趣的同学可以了解下 。Operator的出现可以说给有状态的、特定领域的复杂应用 on Kubernetes 打开了一扇窗,Spark Operator便是其中具有代表性的一个。
Spark Operator几个主要的概念:
SparkApplication:标准的k8s CRD,有CRD就有一个Controller 与之对应。Controller负责监听CRD的创建、更新、以及删除等事件,并作出对应的Action。
ScheduledSparkApplication:SparkApplication的升级,支持带有自定义时间调度策略的作业提交,比如cron。
Submission runner:对Controller发起的创建请求提交spark-submit。
Spark pod monitor:监听Spark pods的状态和事件更新并告知Controller。
安装Spark Operator
推荐用 helm 3.0
helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator helm install incubator/sparkoperator --namespace default --set operatorImageName=registry.cn-hangzhou.aliyuncs.com/eci_open/spark-operator --set operatorVersion=v1beta2-1.0.1-2.4.4 --generate-name --set enableWebhook=true
安装完成后可以看到集群多了个spark operator pod。
选项说明:
1、--set operatorImageName:指定operator镜像,默认的google的镜像阿里云ECI内拉不下来,可以先拉取到本地然后推到ACR。
2、--set operatorVersion operator:镜像仓库名和版本不要写在一起。
3、--generate-name 可以不用显式设置安装名。
4、--set enableWebhook 默认不会打开,对于需要使用ACK+ECI的用户,会用到nodeSelector、tolerations这些高级特性,Webhook 必须要打开,后面会讲到。
注:
创建spark operator的时候,一定要确保镜像能拉下来,推荐直接使用eci_open提供的镜像,因为spark operator卸载的时候也是用相同的镜像启动job进行清理,如果镜像拉不下来清理job也会卡主,导致所有的资源都要手动清理,比较麻烦。
申明wordcount SparkApplication:
apiVersion: "sparkoperator.k8s.io/v1beta2" kind: SparkApplication metadata: name: wordcount namespace: default spec: type: Java mode: cluster image: "registry.cn-beijing.aliyuncs.com/liumi/spark:2.4.4-example" imagePullPolicy: IfNotPresent mainClass: com.aliyun.liumi.spark.example.WordCount mainApplicationFile: "local:///opt/spark/jars/SparkExampleJava-1.0-SNAPSHOT.jar" sparkVersion: "2.4.4" restartPolicy: type: OnFailure onFailureRetries: 2 onFailureRetryInterval: 5 onSubmissionFailureRetries: 2 onSubmissionFailureRetryInterval: 10 timeToLiveSeconds: 36000 sparkConf: "spark.kubernetes.allocation.batch.size": "10" driver: cores: 2 memory: "4096m" labels: version: 2.4.4 spark-app: spark-wordcount role: driver annotations: k8s.aliyun.com/eci-image-cache: "true" serviceAccount: spark executor: cores: 1 instances: 100 memory: "1024m" labels: version: 2.4.4 role: executor annotations: k8s.aliyun.com/eci-image-cache: "true"
注:大部分的参数都可以直接通过SparkApplication CRD已经支持的参数设置,目前支持的所有参数参考:SparkApplication CRD,此外还支持直接以sparkConf形式的传入。
提交:
kubectl create -f wordcount-operator-example.yaml
结果分析
我们是100个1C1G的Excutor并发启动,应用的镜像大小约为 500 MB。
作业执行过程截图:
可以看到并发启动的100个pod基本在30s内可以完成全部的启动,其中93%可以在20秒内完成启动。
看下作业执行时间(包括了vk调度100个Excutor pod时间、每个Excutor pod资源准备的时间、以及作业实际执行的时间等):
exitCode: 0 finishedAt: '2019-11-16T07:31:59Z' reason: Completed startedAt: '2019-11-16T07:29:01Z'
可以看到总共只花了178S,时间降了一个数量级。
ACK + ECI
在Spark中,Driver和Excutor之间的启动顺序是串行的。尽管ECI展现了出色的并发创建Executor pod的能力,但是ASK这种特殊架构会让Driver和Excutor之间的这种串行体现的比较明显,通常情况下在ECI启动一个Driver pod需要大约20s的时间,然后才是大规模的Excutor pod的启动。对于一些响应要求高的应用,Driver的启动速度可能比Excutor执行作业的耗时更重要。这个时候,我们可以采用ACK+ECI,即传统的Kubernetes集群 + virtual kubelet的方式:
对于用户来说,只需如下简单的几步就可以将excutor调度到ECI的virtual node。
1、在ACK集群中安装ECI的virtual kubelet。
进入容器服务控制台的应用目录栏,搜索"ack-virtual-node":
点击进入,选择要安装的集群。
必填参数参考:
virtualNode: image: repository: registry.cn-hangzhou.aliyuncs.com/acs/virtual-nodes-eci tag: v1.0.0.1-aliyun affinityAdminssion: enabled: true image: repository: registry.cn-hangzhou.aliyuncs.com/ask/virtual-node-affinity-admission-controller tag: latest env: ECI_REGION: "cn-hangzhou" #集群所在的地域 ECI_VPC: vpc-bp187fy2e7l123456 # 集群所在的vpc,和创建集群的时候保持一致即可,可以在集群概览页查看 ECI_VSWITCH: vsw-bp1bqf53ba123456 # 资源所在的交换机,同上 ECI_SECURITY_GROUP: sg-bp12ujq5zp12346 # 资源所在的安全组,同上 ECI_ACCESS_KEY: XXXXX #账号AK ECI_SECRET_KEY: XXXXX #账号SK ALIYUN_CLUSTERID: virtual-kubelet
2、修改应用的yaml
为excutor增加如下参数即可:
nodeSelector: type: virtual-kubelet tolerations: - key: virtual-kubelet.io/provider operator: Exists
完整的应用参数如下:
apiVersion: "sparkoperator.k8s.io/v1beta2" kind: SparkApplication metadata: name: wordcount namespace: default spec: type: Java mode: cluster image: "registry.cn-beijing.aliyuncs.com/liumi/spark:2.4.4-example" imagePullPolicy: IfNotPresent mainClass: com.aliyun.liumi.spark.example.WordCount mainApplicationFile: "local:///opt/spark/jars/SparkExampleJava-1.0-SNAPSHOT.jar" sparkVersion: "2.4.4" restartPolicy: type: OnFailure onFailureRetries: 2 onFailureRetryInterval: 5 onSubmissionFailureRetries: 2 onSubmissionFailureRetryInterval: 10 timeToLiveSeconds: 36000 sparkConf: "spark.kubernetes.allocation.batch.size": "10" driver: cores: 2 memory: "4096m" labels: version: 2.4.4 spark-app: spark-wordcount role: driver annotations: k8s.aliyun.com/eci-image-cache: "true" serviceAccount: spark executor: cores: 1 instances: 100 memory: "1024m" labels: version: 2.4.4 role: executor annotations: k8s.aliyun.com/eci-image-cache: "true" #nodeName: virtual-kubelet nodeSelector: type: virtual-kubelet tolerations: - key: virtual-kubelet.io/provider operator: Exists
这样就可以将Driver调度到ACK,Excutor调度到ECI上,完美互补。
3、提交
效果如下:
看下作业执行时间:
exitCode: 0 finishedAt: '2019-11-16T07:25:05Z' reason: Completed startedAt: '2019-11-16T07:22:40Z'
总共花了145秒,更重要的是Driver直接在本地起,只花了约2秒的时间就启动了。
总结:
作业执行时间不是Kubernetes + ECI的绝对优势,如果在ACK上准备好足够的节点资源,也是可以达到这个水平的。
我们的优势是:
1)弹性和成本
对于不管是采用ACK + ECI还是ASK+ECI的方式,提交作业前无需提前预留任何资源,无需关心集群的扩缩容,所有资源都是随作业提交自动开始申请,作业执行结束后自动释放。作业执行完后就只剩一个SparkApplication和终态的Driver pod(只保留管控数据)。除此之外,ACK + ECI的方式还提供了更丰富的调度选择:1)可以将Driver和Excutor分开调度;2)考虑作业类型、成本等因素选择不同的调度资源,以满足更广泛的使用场景。
2)计算与存储分离
在Kubernetes中跑大数据一直很困扰的问题就是数据存储的问题,到了Serverless kubernetes这个问题就更突出。我们连节点都没有,就更不可能去搭建HDFS/Yarn集群。而事实上,在HDFS集群上跑Spark,已经不是必需的了,见引用[1, 2]。阿里云的HDFS存储也正解了我们这个痛点问题,经测试读写性能也非常不错。我们可以将计算和存储分离,即kubernetes集群中的作业可以直接原生访问HDFS的数据。除了HDFS,阿里云的NAS和OSS也是可选的数据存储。
3)调度
调度通常可以分为以YARN为代表的两级调度和集中式调度。两级调度有一个中央调度器负责宏观层面的资源调度,而应用的细粒度调度则由下层调度器来完成。集中式调度则对所有的资源请求进行统一调度,Kubernetes的调度就是典型的代表,Kubernetes通过将整个集群的资源信息缓存到本地,利用本地的数据进行乐观调度,进而提高调度器的性能。
当前kubernetes集群的达到一定规模的时候,性能会到达瓶颈,引用[3]。YARN可以说是历经了大数据领域多年锤炼的成果,采用kubernetes原生调度器来调度Spark作业能否hold住还是一个问号。
而对于Serverless Kubernetes,就变成了类两级调度:对于kubernetes来说调度其实进行了极大的简化,调度器只用将资源统一调度到virtual kubelet,而实际的细粒度调度就下沉到了阿里云强大的弹性计算的调度。
当处理的数据量越大,突发启动Excutor pod规模越大的时候,我们的优势会越明显。
参考
[1] HDFS vs. Cloud Storage: Pros, cons and migration tips
[2] New release of Cloud Storage Connector for Hadoop: Improving performance, throughput and more
[3] Understanding Scalability and Performance in the Kubernetes Master , Xingyu Chen, Fansong Zeng Alibaba Cloud
附录
Spark Base 镜像:
本样例采用的是谷歌提供的 gcr.io/spark-operator/spark:v2.4.4
ECI已经帮拉取到ACR仓库,各地域地址如下:
公网地址:registry.{对应regionId}.aliyuncs.com/eci_open/spark:2.4.4
vpc网络地址:registry-vpc.{对应regionId}.aliyuncs.com/eci_open/spark:2.4.4
Spark Operator 镜像
本样例采用的是谷歌提供的 gcr.io/spark-operator/spark-operator:v1beta2-1.0.1-2.4.4
ECI已经帮拉取到ACR仓库,各地域地址如下:
公网地址:registry.{对应regionId}.aliyuncs.com/eci_open/spark-operator:v1beta2-1.0.1-2.4.4
vpc网络地址:registry-vpc.{对应regionId}.aliyuncs.com/eci_open/spark-operator:v1beta2-1.0.1-2.4.4

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Elasticsearch Rest Client bboss v5.9.7 发布
The best Elasticsearch HighlevelRest Client API-----bbossv5.9.7 发布。 bboss elasticsearch 是一套基 于query dsl 语法操作和访问分布式搜索引擎 elasticsearch 的 o/r mapping 高性能java开发库,底层基于 es restful api。 主要功能特色 ElasticSearch兼容性:1.x,2.x,5.x,6.x,7.x,+ JDK兼容性: jdk 1.7+ Spring boot兼容性:1.x,2.x ORM和DSL二者兼顾,类mybatis方式操作ElasticSearch, 支持SQL和JDBC 提供快速而高效的数据同步导入ES工具(支持DB到Elasticsearch,Elasticsearch到DB,MongoDB到Elastisearch数据同步,后续将支持更多的数据源) 提供按时间日期ES历史数据清理工具 APM开源产品pinpoint官方Elasticsearch bboss 客户端性能监控插件,插件地址: https://github.com/...
- 下一篇
在 Cloudera Data Flow 上运行你的第一个 Flink 例子
文档编写目的 Cloudera Data Flow(CDF) 作为 Cloudera 一个独立的产品单元,围绕着实时数据采集,实时数据处理和实时数据分析有多个不同的功能模块,如下图所示: 图中 4 个功能模块从左到右分别解释如下: Cloudera Edge Management(CEM),主要是指在边缘设备如传感器上部署 MiNiFi 的 agent 后用于采集数据。 Cloudera Flow Management(CFM),主要是使用 Apache NiFi 通过界面化拖拽的方式实现数据采集,处理和转换。 Cloudera Streaming Processing(CSP),主要包括 Apache Kafka,Kafka Streams,Kafka 的监控 Streams Messaging Manager(SMM),以及跨集群 Kafka topic 的数据复制 Streams Replication Manager(SRM)。 Cloudera Streaming Analytics(CSA),以前这块是使用 Storm 来作为 Native Streaming 来补充 Sp...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- Linux系统CentOS6、CentOS7手动修改IP地址
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS8安装Docker,最新的服务器搭配容器使用
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Windows10,CentOS7,CentOS8安装Nodejs环境