Spark Streaming Dynamic Resource Allocation 文档(非官方特性)

必要配置

通过下面参数开启DRA
spark.streaming.dynamicAllocation.enabled=true
设置最大最小的Executor 数目:
spark.streaming.dynamicAllocation.minExecutors=0
spark.streaming.dynamicAllocation.maxExecutors=50

可选配置

这些参数可以不用配置,都已经提供了一个较为合理的默认值

开启日志:

spark.streaming.dynamicAllocation.debug=true
设置DRA 生效延时:
spark.streaming.dynamicAllocation.delay.rounds=10
设置DRA 计算资源量时参考的周期数:
spark.streaming.dynamicAllocation.rememberBatchSize=1
设置DRA 释放资源的步调:
spark.streaming.dynamicAllocation.releaseRounds=5
设置DRA 资源额外保留比例:
spark.streaming.dynamicAllocation.reserveRate=0.2

DRA 算法说明

减少资源时,采用启发式算法。根据之前周期的处理时间,计算需要保留的资源量(A),然后尝试分多轮试探性的减少(B),每个计算周期都会重复A,B动作,最后会收敛到一个具体的数值。
如果一旦发生延时,则会立马向Yarn申请spark.streaming.dynamicAllocation.maxExecutors 个Executor,以保证可以最快速度消除延时。富余出来的资源会通过减少资源的动作慢慢进行减少,让程序趋于稳定。
发生减少资源的动作,则剔除的掉的Executor 会被立刻(几毫秒/纳秒)屏蔽,并且不再分配Task,之后再由Yarn异步移除。
添加资源的动作,则由Yarn决定

注意事项

 请务必保证你Package 的App包不包含spark 相关的组件。否则你会看到自己的设置并不生效,因为运行的时候用了你的App里的spark-core,spark-streaming jar包了。

一些可以参考的调整

如果系统趋向稳定后,经过人工观察发现其实还可以再降资源,则可以尝试调低
spark.streaming.dynamicAllocation.releaseRounds=5
spark.streaming.dynamicAllocation.reserveRate=0.2
建议releaseRounds 不低于2,reserveRate 不低于0.05。避免系统发生颠簸。

测试代码

object IamGod {
  def main(args: Array[String]): Unit = {

    def createContext = {
      val conf = new SparkConf().setAppName("DRA Test")
      val ssc = new StreamingContext(conf, Seconds(30))

      val items1 = Seq.fill(30)(Seq((10 + scala.util.Random.nextInt(10)) * 1000))
      val items2 = Seq.fill(30)(Seq((30 + scala.util.Random.nextInt(10)) * 1000))
      val items3 = Seq.fill(30)(Seq((20 + scala.util.Random.nextInt(10)) * 1000))

      val fileInput = new TestInputStream[Int](ssc, items1 ++ items2 ++ items3, 10)

      val logs = fileInput.map(f => Thread.sleep(f))

      logs.foreachRDD { rdd =>
        rdd.count()
      }

      ssc
    }

    val ssc = createContext

    ssc.start()
    ssc.awaitTermination()

  }
}
前面引用了一个测试类:
class TestInputStream[T: ClassTag](_ssc: StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
  extends InputDStream[T](_ssc) {

  def start() {}

  def stop() {}

  def compute(validTime: Time): Option[RDD[T]] = {
    logInfo("Computing RDD for time " + validTime)
    val index = ((validTime - zeroTime) / slideDuration - 1).toInt
    val selectedInput = if (index < input.size) input(index) else Seq[T]()

    // lets us test cases where RDDs are not created
    if (selectedInput == null) {
      return None
    }

    // Report the input data's information to InputInfoTracker for testing
    val inputInfo = StreamInputInfo(id, selectedInput.length.toLong)
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

    val rdd = ssc.sc.makeRDD(selectedInput, numPartitions)
    logInfo("Created RDD " + rdd.id + " with " + selectedInput)
    Some(rdd)
  }
}
优秀的个人博客,低调大师

微信关注我们

原文链接:https://yq.aliyun.com/articles/60266

转载内容版权归作者及来源网站所有!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

相关文章

发表评论

资源下载

更多资源
Mario,低调大师唯一一个Java游戏作品

Mario,低调大师唯一一个Java游戏作品

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Oracle Database,又名Oracle RDBMS

Oracle Database,又名Oracle RDBMS

Oracle Database,又名Oracle RDBMS,或简称Oracle。是甲骨文公司的一款关系数据库管理系统。它是在数据库领域一直处于领先地位的产品。可以说Oracle数据库系统是目前世界上流行的关系数据库管理系统,系统可移植性好、使用方便、功能强,适用于各类大、中、小、微机环境。它是一种高效率、可靠性好的、适应高吞吐量的数据库方案。

Java Development Kit(Java开发工具)

Java Development Kit(Java开发工具)

JDK是 Java 语言的软件开发工具包,主要用于移动设备、嵌入式设备上的java应用程序。JDK是整个java开发的核心,它包含了JAVA的运行环境(JVM+Java系统类库)和JAVA工具。

Sublime Text 一个代码编辑器

Sublime Text 一个代码编辑器

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。