Spark Streaming Dynamic Resource Allocation 文档(非官方特性)
必要配置
spark.streaming.dynamicAllocation.enabled=true
spark.streaming.dynamicAllocation.minExecutors=0
spark.streaming.dynamicAllocation.maxExecutors=50
可选配置
开启日志:
spark.streaming.dynamicAllocation.debug=true
spark.streaming.dynamicAllocation.delay.rounds=10
spark.streaming.dynamicAllocation.rememberBatchSize=1
spark.streaming.dynamicAllocation.releaseRounds=5
spark.streaming.dynamicAllocation.reserveRate=0.2
DRA 算法说明
注意事项
一些可以参考的调整
spark.streaming.dynamicAllocation.releaseRounds=5
spark.streaming.dynamicAllocation.reserveRate=0.2
测试代码
object IamGod {
def main(args: Array[String]): Unit = {
def createContext = {
val conf = new SparkConf().setAppName("DRA Test")
val ssc = new StreamingContext(conf, Seconds(30))
val items1 = Seq.fill(30)(Seq((10 + scala.util.Random.nextInt(10)) * 1000))
val items2 = Seq.fill(30)(Seq((30 + scala.util.Random.nextInt(10)) * 1000))
val items3 = Seq.fill(30)(Seq((20 + scala.util.Random.nextInt(10)) * 1000))
val fileInput = new TestInputStream[Int](ssc, items1 ++ items2 ++ items3, 10)
val logs = fileInput.map(f => Thread.sleep(f))
logs.foreachRDD { rdd =>
rdd.count()
}
ssc
}
val ssc = createContext
ssc.start()
ssc.awaitTermination()
}
}
class TestInputStream[T: ClassTag](_ssc: StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
extends InputDStream[T](_ssc) {
def start() {}
def stop() {}
def compute(validTime: Time): Option[RDD[T]] = {
logInfo("Computing RDD for time " + validTime)
val index = ((validTime - zeroTime) / slideDuration - 1).toInt
val selectedInput = if (index < input.size) input(index) else Seq[T]()
// lets us test cases where RDDs are not created
if (selectedInput == null) {
return None
}
// Report the input data's information to InputInfoTracker for testing
val inputInfo = StreamInputInfo(id, selectedInput.length.toLong)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
val rdd = ssc.sc.makeRDD(selectedInput, numPartitions)
logInfo("Created RDD " + rdd.id + " with " + selectedInput)
Some(rdd)
}
}