Spark Streaming Dynamic Resource Allocation

Problem Statement

DRA has already been implemented since Spark 1.2 . However the existing Spark DRA on Yarn implementation does not embody the specific property of Spark Streaming.  
Spark DRA works when there are some executors being idle for  removeExecutorInterval time,then they will be removed or when there is a backlog of pending tasks waiting to be rescheduled,then new executors will be added。This mechanism works fine for Long-Time stage。 Spark Streaming is known as  micro batch processing,hence most of time the duration of batch is small. It will cause churn when removing executors at the end of the batch and adding executors at the beginning of next batch. 
 In real-time data processing, data slowly  increase or slowly down,so the adjacent batches have almost the same data to process.
The perfect result of spark streaming is processing time equal to duration. So the key concept is reducing/increasing resource until processing time infinitely close to duration .

Goals

The goal is to make processing time  infinitely close to duration by reducing/increasing resource in  spark streaming . And we also hope having a reasonable stable time  after short-time adjustment which means resource adjustment rarely  happens

Design Summary

The same as spark DRA, Spark Streaming DRA is also  disabled,but can be enabled by a new Spark conf property spark.streaming.dynamicAllocation.enabled. All related Spark Streaming DRA properties introduced will bepackaged in the spark.streaming.dynamicAllocation.* namespace

Adding Executors

The job in Spark Streaming is  time-sensitive  .  Once  there is a batch delayed it triggers Adding-Executors action. This Action  will request  the number of spark.streaming.dynamicAllocation.maxExecutors executors from Yarn immediately in  greedy way since we hope we can eliminate the delay as soon as possible ,at the same time,we also should take care  of the situation when requesting resource with multi rounds  may have resource scarcity  in shared cluster.
Removing Executors
As mentioned before, Spark Streaming,  time-sensitive , so the action removing executors  is not allowed to consume too much time of duration. So executors marked as removed will be added to pendingToRemove set so new tasks will not be launched in them ,then we acknowledge yarn to kill them asynchronously.

Algorithm of DRA

To calculate the number of executor should be removed every round,the formula is used as following
val totalRemoveExecutorNum = Math.round(
currentExecutors * (
(duration.toDouble - processDuration) / duration - reserveRate)
)
val actualShouldRemoveExecutorNumInThisRound = totalRemoveExecutorNum / releaseRounds
In this formula, we suppose processing time  has a strong relationship with executor number,however,they are not  linear relationship. To fix this, we add some new parameters like  reserveRate and releaseRounds to make sure we fit this non-linear relationship .
We also provide a  heuristic strategy to reduce executor number. This formula will be calculate in every round, so every round the number will be readjusted circularly ,and finally actualShouldRemoveExecutorNumInThisRound will converge to zero .

Implementation of DRA

New classes should be added in spark streaming module:
package org.apache.spark.streaming
private[spark] class StreamingExecutorAllocationManager(client: ExecutorAllocationClient,
                                                        duration: Long,
                                                        steamingListenerBus: StreamingListenerBus,
                                                        listenerBus: LiveListenerBus,
                                                        conf: SparkConf) extends Logging {
  allocationManager =>
......
}

private class StreamingExecutorAllocationListener extends SparkListener {
......
}

private class StreamingSchedulerListener extends StreamingListener {
.....
}
StreamingExecutorAllocationManager is initialed in StreamingContext 
Class affected in spark core module:
package org.apache.spark
private[spark] trait ExecutorAllocationClient {
   //blackList who can remove executors immediately make Yarn remove them asynchronously possible
  def addExecutorToPendingStatus(executorId: String): Unit = {}

  //cause spark streaming is initial late then spark core module,
  // we need to know how many executors we already have when 
  // spark streaming is initaled
  def executors(): List[String] = { List() }
}
SparkContext and CoarseGrainedSchedulerBackend both are affected because they are extend from ExecutorAllocationClient.
JobScheduler is also should be modified so before batch job submitted we can adjust resource.
package org.apache.spark.streaming.scheduler
private[streaming]class JobScheduler(val ssc: StreamingContext) extends Logging {

def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
      logInfo("No jobs added for time " + jobSet.time)
    } else {
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
      jobSets.put(jobSet.time, jobSet)

      //before job submitted,we should compute resource actually required
      ssc.executorAllocationManager match {
        case Some(eam)=>  eam.run()
        case None => //do nothing
      }
   .........
    }
  }
......}

DRA Available Properties

Enable DRA:
spark.streaming.dynamicAllocation.enabled=true
Upper/Lower bound for the number of executors if dynamic allocation is enabled.
spark.streaming.dynamicAllocation.minExecutors=0
spark.streaming.dynamicAllocation.maxExecutors=50
More message will be printed in log if DRA is enabled. Default is false
spark.streaming.dynamicAllocation.debug=true
Rounds(Batches) will be used to release the resource calculated in current round. Default is 5
spark.streaming.dynamicAllocation.releaseRounds=5
The number of rounds(Batches) should be remembered. We can increase this number if batch processing time is unstable . this number affects processDuration in    (duration.toDouble - processDuration) .Default is 1 
spark.streaming.dynamicAllocation.rememberBatchSize=1
DRA delays to  work when  the specific number of rounds has been submitted. Default is 10
spark.streaming.dynamicAllocation.delay.rounds=10
Make sure the resource  is more than reserveRate * current number of Executors. More useful than minExecutorNumber. Default is 0.2
spark.streaming.dynamicAllocation.reserveRate=0.2
优秀的个人博客,低调大师

微信关注我们

原文链接:https://yq.aliyun.com/articles/60270

转载内容版权归作者及来源网站所有!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

相关文章

发表评论

资源下载

更多资源
Mario,低调大师唯一一个Java游戏作品

Mario,低调大师唯一一个Java游戏作品

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Apache Tomcat7、8、9(Java Web服务器)

Apache Tomcat7、8、9(Java Web服务器)

Tomcat是Apache 软件基金会(Apache Software Foundation)的Jakarta 项目中的一个核心项目,由Apache、Sun 和其他一些公司及个人共同开发而成。因为Tomcat 技术先进、性能稳定,而且免费,因而深受Java 爱好者的喜爱并得到了部分软件开发商的认可,成为目前比较流行的Web 应用服务器。

Java Development Kit(Java开发工具)

Java Development Kit(Java开发工具)

JDK是 Java 语言的软件开发工具包,主要用于移动设备、嵌入式设备上的java应用程序。JDK是整个java开发的核心,它包含了JAVA的运行环境(JVM+Java系统类库)和JAVA工具。

Sublime Text 一个代码编辑器

Sublime Text 一个代码编辑器

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。