Spark Master资源调度--worker向master注册
Spark Master资源调度–Worker向Master注册
更多资源
- github: https://github.com/opensourceteams/spark-scala-maven
- csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769
Youtube 视频
- youtube (Spark Master资源调度–Worker向Master注册) https://youtu.be/SFqXaIKt-yI
Bilibili 视频
- bilibili (Spark Master资源调度–Worker向Master注册) https://www.bilibili.com/video/av37442280/
Worker向Master注册
worker发送注册消息(RegisterWorker)
override def onStart() { assert(!registered) logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format( host, port, cores, Utils.megabytesToString(memory))) logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}") logInfo("Spark home: " + sparkHome) createWorkDir() shuffleService.startIfEnabled() webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() val scheme = if (webUi.sslOptions.enabled) "https" else "http" workerWebUiUrl = s"$scheme://$publicAddress:${webUi.boundPort}" registerWithMaster() metricsSystem.registerSource(workerSource) metricsSystem.start() // Attach the worker metrics servlet handler to the web ui after the metrics system is started. metricsSystem.getServletHandlers.foreach(webUi.attachHandler) }
private def registerWithMaster() { // onDisconnected may be triggered multiple times, so don't attempt registration // if there are outstanding registration attempts scheduled. registrationRetryTimer match { case None => registered = false registerMasterFutures = tryRegisterAllMasters() connectionAttemptCount = 0 registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate( new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { Option(self).foreach(_.send(ReregisterWithMaster)) } }, INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS, INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS, TimeUnit.SECONDS)) case Some(_) => logInfo("Not spawning another attempt to register with the master, since there is an" + " attempt scheduled already.") } }
private def tryRegisterAllMasters(): Array[JFuture[_]] = { masterRpcAddresses.map { masterAddress => registerMasterThreadPool.submit(new Runnable { override def run(): Unit = { try { logInfo("Connecting to master " + masterAddress + "...") val masterEndpoint = rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME) registerWithMaster(masterEndpoint) } catch { case ie: InterruptedException => // Cancelled case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e) } } }) } }
private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = { masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker( workerId, host, port, self, cores, memory, workerWebUiUrl)) .onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => Utils.tryLogNonFatalError { handleRegisterResponse(msg) } case Failure(e) => logError(s"Cannot register with master: ${masterEndpoint.address}", e) System.exit(1) }(ThreadUtils.sameThread) }
Master处理Worker的注册消息
receiveAndReply接收消息
- 在master上new WorkerInfo信息
- WorkerInfo信息注册到master上(内存中)
- 把WorkerInfo信息保存到master的存储引擎中
- 给Worker发送消息: RegisteredWorker
- 调用master的资源调试方法,一般在worker启动时,此时还没有新的作业提交,所以此时资源调度是没有实际分配的
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RegisterWorker( id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( workerHost, workerPort, cores, Utils.megabytesToString(memory))) if (state == RecoveryState.STANDBY) { context.reply(MasterInStandby) } else if (idToWorker.contains(id)) { context.reply(RegisterWorkerFailed("Duplicate worker ID")) } else { val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, workerRef, workerWebUiUrl) if (registerWorker(worker)) { persistenceEngine.addWorker(worker) context.reply(RegisteredWorker(self, masterWebUiUrl)) schedule() } else { val workerAddress = worker.endpoint.address logWarning("Worker registration failed. Attempted to re-register worker at same " + "address: " + workerAddress) context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress)) } } }
private def registerWorker(worker: WorkerInfo): Boolean = { // There may be one or more refs to dead workers on this same node (w/ different ID's), // remove them. workers.filter { w => (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD) }.foreach { w => workers -= w } val workerAddress = worker.endpoint.address if (addressToWorker.contains(workerAddress)) { val oldWorker = addressToWorker(workerAddress) if (oldWorker.state == WorkerState.UNKNOWN) { // A worker registering from UNKNOWN implies that the worker was restarted during recovery. // The old worker must thus be dead, so we will remove it and accept the new worker. removeWorker(oldWorker) } else { logInfo("Attempted to re-register worker at same address: " + workerAddress) return false } } workers += worker idToWorker(worker.id) = worker addressToWorker(workerAddress) = worker true }
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Spark Worker启动源码分析
Spark Worker启动源码分析 更多资源 github: https://github.com/opensourceteams/spark-scala-maven csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769 Youtube视频分享 youtube: https://youtu.be/ll_Ae6rP7II Bilibili视频分享 bilibili: https://www.bilibili.com/video/av37442247/ start-slave.sh启动脚本 worker启动脚本跟master一样,都调用spark-daemon.sh,只是启动类不一样 CLASS="org.apache.spark.deploy.worker.Worker" spark-daemon.sh start $CLASS Worker main入口 主要源码 启动 ‘sparkWorker’ 的服务 def main(argStrings: Array[String]) { Ut...
- 下一篇
Spark Master资源调度--SparkContext向所有master注册
Spark Master资源调度–SparkContext向所有master注册 更多资源 github: https://github.com/opensourceteams/spark-scala-maven csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769 Youtube视频分享 Spark Master资源调度–SparkContext向所有master注册 : https://youtu.be/AXxCnCc5Mh0 Bilibili视频分享 Spark Master资源调度–SparkContext向所有master注册 : https://www.bilibili.com/video/av37442295/ SparkContext启动向master发送消息 ClientEndpoint向master发送消息: RegisterApplication /** * Register with all masters asynchronously and returns a...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7,8上快速安装Gitea,搭建Git服务器
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- CentOS关闭SELinux安全模块
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- CentOS8编译安装MySQL8.0.19
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- CentOS8安装Docker,最新的服务器搭配容器使用
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Red5直播服务器,属于Java语言的直播服务器