您现在的位置是:首页 > 文章详情

Spark Master资源调度--SparkContext向所有master注册

日期:2018-12-03点击:348

Spark Master资源调度–SparkContext向所有master注册

更多资源

Youtube视频分享

Bilibili视频分享

SparkContext启动向master发送消息

  • ClientEndpoint向master发送消息: RegisterApplication
 /** * Register with all masters asynchronously and returns an array `Future`s for cancellation. */ private def tryRegisterAllMasters(): Array[JFuture[_]] = { for (masterAddress <- masterRpcAddresses) yield { registerMasterThreadPool.submit(new Runnable { override def run(): Unit = try { if (registered.get) { return } logInfo("Connecting to master " + masterAddress.toSparkURL + "...") val masterRef = rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME) masterRef.send(RegisterApplication(appDescription, self)) } catch { case ie: InterruptedException => // Cancelled case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e) } }) } } 

master处理消息RegisterApplication

  • 创建 Application 并注册到master上
  • Application 保存到 master 存储引擎中
  • 向driver发送已注册成功消息: RegisteredApplication
 case RegisterApplication(description, driver) => { // TODO Prevent repeated registrations from some driver if (state == RecoveryState.STANDBY) { // ignore, don't send response } else { logInfo("Registering app " + description.name) val app = createApplication(description, driver) registerApplication(app) logInfo("Registered app " + description.name + " with ID " + app.id) persistenceEngine.addApplication(app) driver.send(RegisteredApplication(app.id, self)) schedule() } } 
  • 过滤所有已注册的Worker(状态为ALIVE)
  • 遍历 waitingDrivers,如果有等待中的Drivers,给worker发送启动Driver消息: LaunchDriver
  • 调用在worker上启动executor方法
 /** * Schedule the currently available resources among waiting apps. This method will be called * every time a new app joins or resource availability changes. */ private def schedule(): Unit = { if (state != RecoveryState.ALIVE) { return } // Drivers take strict precedence over executors val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) val numWorkersAlive = shuffledAliveWorkers.size var curPos = 0 for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers // We assign workers to each waiting driver in a round-robin fashion. For each driver, we // start from the last worker that was assigned a driver, and continue onwards until we have // explored all alive workers. var launched = false var numWorkersVisited = 0 while (numWorkersVisited < numWorkersAlive && !launched) { val worker = shuffledAliveWorkers(curPos) numWorkersVisited += 1 if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver launched = true } curPos = (curPos + 1) % numWorkersAlive } } startExecutorsOnWorkers() } 
  • 过滤waitingApps,刚才注册的Application已经在ArrayBuffer中
  • 对已注册的worker进行过滤
  • 过滤条件状态为ALIVE,可用cpu内核数大于等于每个executor的内核数,可用内存大于等于Application在每个executor需要的内存数
  • 对可用worker进行排序(按可用内核数从大到小排序)
  • 调用方法 scheduleExecutorsOnWorkers,worker给executor分配多少个cpu内核
 /** * Schedule and launch executors on workers */ private def startExecutorsOnWorkers(): Unit = { // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app // in the queue, then the second app, etc. for (app <- waitingApps if app.coresLeft > 0) { val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor // Filter out workers that don't have enough resources to launch an executor val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree >= coresPerExecutor.getOrElse(1)) .sortBy(_.coresFree).reverse val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) // Now that we've decided how many cores to allocate on each worker, let's allocate them for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { allocateWorkerResourceToExecutors( app, assignedCores(pos), coresPerExecutor, usableWorkers(pos)) } } } 
  • 进行具体的当前Application在Worker上给executor分配几个cpu内核
 /** * Schedule executors to be launched on the workers. * Returns an array containing number of cores assigned to each worker. * * There are two modes of launching executors. The first attempts to spread out an application's * executors on as many workers as possible, while the second does the opposite (i.e. launch them * on as few workers as possible). The former is usually better for data locality purposes and is * the default. * * The number of cores assigned to each executor is configurable. When this is explicitly set, * multiple executors from the same application may be launched on the same worker if the worker * has enough cores and memory. Otherwise, each executor grabs all the cores available on the * worker by default, in which case only one executor may be launched on each worker. * * It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core * at a time). Consider the following example: cluster has 4 workers with 16 cores each. * User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is * allocated at a time, 12 cores from each worker would be assigned to each executor. * Since 12 < 16, no executors would launch [SPARK-8881]. */ private def scheduleExecutorsOnWorkers( app: ApplicationInfo, usableWorkers: Array[WorkerInfo], spreadOutApps: Boolean): Array[Int] = { val coresPerExecutor = app.desc.coresPerExecutor val minCoresPerExecutor = coresPerExecutor.getOrElse(1) val oneExecutorPerWorker = coresPerExecutor.isEmpty val memoryPerExecutor = app.desc.memoryPerExecutorMB val numUsable = usableWorkers.length val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) /** Return whether the specified worker can launch an executor for this app. */ def canLaunchExecutor(pos: Int): Boolean = { val keepScheduling = coresToAssign >= minCoresPerExecutor val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor // If we allow multiple executors per worker, then we can always launch new executors. // Otherwise, if there is already an executor on this worker, just give it more cores. val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0 if (launchingNewExecutor) { val assignedMemory = assignedExecutors(pos) * memoryPerExecutor val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit keepScheduling && enoughCores && enoughMemory && underLimit } else { // We're adding cores to an existing executor, so no need // to check memory and executor limits keepScheduling && enoughCores } } // Keep launching executors until no more workers can accommodate any // more executors, or if we have reached this application's limits var freeWorkers = (0 until numUsable).filter(canLaunchExecutor) while (freeWorkers.nonEmpty) { freeWorkers.foreach { pos => var keepScheduling = true while (keepScheduling && canLaunchExecutor(pos)) { coresToAssign -= minCoresPerExecutor assignedCores(pos) += minCoresPerExecutor // If we are launching one executor per worker, then every iteration assigns 1 core // to the executor. Otherwise, every iteration assigns cores to a new executor. if (oneExecutorPerWorker) { assignedExecutors(pos) = 1 } else { assignedExecutors(pos) += 1 } // Spreading out an application means spreading out its executors across as // many workers as possible. If we are not spreading out, then we should keep // scheduling executors on this worker until we use all of its resources. // Otherwise, just move on to the next worker. if (spreadOutApps) { keepScheduling = false } } } freeWorkers = freeWorkers.filter(canLaunchExecutor) } assignedCores } 
  • 分配worker资源给executor
  • 给worker发送启动executor消息: LaunchExecutor
  • 给driver发送Executor已增加消息:ExecutorAdded
/** * Allocate a worker's resources to one or more executors. * @param app the info of the application which the executors belong to * @param assignedCores number of cores on this worker for this application * @param coresPerExecutor number of cores per executor * @param worker the worker info */ private def allocateWorkerResourceToExecutors( app: ApplicationInfo, assignedCores: Int, coresPerExecutor: Option[Int], worker: WorkerInfo): Unit = { // If the number of cores per executor is specified, we divide the cores assigned // to this worker evenly among the executors with no remainder. // Otherwise, we launch a single executor that grabs all the assignedCores on this worker. val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1) val coresToAssign = coresPerExecutor.getOrElse(assignedCores) for (i <- 1 to numExecutors) { val exec = app.addExecutor(worker, coresToAssign) launchExecutor(worker, exec) app.state = ApplicationState.RUNNING } } 
原文链接:https://yq.aliyun.com/articles/675422
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章