Spark Worker启动源码分析
Spark Worker启动源码分析
更多资源
- github: https://github.com/opensourceteams/spark-scala-maven
 - csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769
 
Youtube视频分享
- youtube: https://youtu.be/ll_Ae6rP7II
 
Bilibili视频分享
start-slave.sh启动脚本
- worker启动脚本跟master一样,都调用spark-daemon.sh,只是启动类不一样
 
CLASS="org.apache.spark.deploy.worker.Worker"
spark-daemon.sh start $CLASS 
 
 Worker main入口
主要源码
- 启动 ‘sparkWorker’ 的服务
 
def main(argStrings: Array[String]) {
    Utils.initDaemon(log)
    val conf = new SparkConf
    val args = new WorkerArguments(argStrings, conf)
    val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
      args.memory, args.masters, args.workDir, conf = conf)
    rpcEnv.awaitTermination()
  }
  def startRpcEnvAndEndpoint(
      host: String,
      port: Int,
      webUiPort: Int,
      cores: Int,
      memory: Int,
      masterUrls: Array[String],
      workDir: String,
      workerNumber: Option[Int] = None,
      conf: SparkConf = new SparkConf): RpcEnv = {
    // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
    val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
    val securityMgr = new SecurityManager(conf)
    val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
    val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
    rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
      masterAddresses, systemName, ENDPOINT_NAME, workDir, conf, securityMgr))
    rpcEnv
  }
 
 Worker onStart方法调用
WorkUI
- 启动WorkerUI
 
向所有master注册
- 线程池中每个master单独一个线程,向master注册worker
 - worker通过 masterEndpoint.ask向master发送注册worker消息 : RegisterWorker
 - master 接收到消息(RegisterWorker)处理后,回应worker消息 : RegisteredWorker
 - worker收到RegisteredWorker消息后,进行 registered = true,和刷新内存中的master信息
 
 override def onStart() {
    assert(!registered)
    logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
      host, port, cores, Utils.megabytesToString(memory)))
    logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
    logInfo("Spark home: " + sparkHome)
    createWorkDir()
    shuffleService.startIfEnabled()
    webUi = new WorkerWebUI(this, workDir, webUiPort)
    webUi.bind()
    val scheme = if (webUi.sslOptions.enabled) "https" else "http"
    workerWebUiUrl = s"$scheme://$publicAddress:${webUi.boundPort}"
    registerWithMaster()
    metricsSystem.registerSource(workerSource)
    metricsSystem.start()
    // Attach the worker metrics servlet handler to the web ui after the metrics system is started.
    metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
  }
 
  private def registerWithMaster() {
    // onDisconnected may be triggered multiple times, so don't attempt registration
    // if there are outstanding registration attempts scheduled.
    registrationRetryTimer match {
      case None =>
        registered = false
        registerMasterFutures = tryRegisterAllMasters()
        connectionAttemptCount = 0
        registrationRetryTimer = Some(forwordMessageScheduler.scxheduleAtFixedRate(
          new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              Option(self).foreach(_.send(ReregisterWithMaster))
            }
          },
          INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
          INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
          TimeUnit.SECONDS))
      case Some(_) =>
        logInfo("Not spawning another attempt to register with the master, since there is an" +
          " attempt scheduled already.")
    }
  }
  private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
    masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
      workerId, host, port, self, cores, memory, workerWebUiUrl))
      .onComplete {
        // This is a very fast action so we can use "ThreadUtils.sameThread"
        case Success(msg) =>
          Utils.tryLogNonFatalError {
            handleRegisterResponse(msg)
          }
        case Failure(e) =>
          logError(s"Cannot register with master: ${masterEndpoint.address}", e)
          System.exit(1)
      }(ThreadUtils.sameThread)
  }
  private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
    msg match {
      case RegisteredWorker(masterRef, masterWebUiUrl) =>
        logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
        registered = true
        changeMaster(masterRef, masterWebUiUrl)
        forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
          override def run(): Unit = Utils.tryLogNonFatalError {
            self.send(SendHeartbeat)
          }
        }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
        if (CLEANUP_ENABLED) {
          logInfo(
            s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
          forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              self.send(WorkDirCleanup)
            }
          }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
        }
      case RegisterWorkerFailed(message) =>
        if (!registered) {
          logError("Worker registration failed: " + message)
          System.exit(1)
        }
      case MasterInStandby =>
        // Ignore. Master not yet ready.
    }
  }
 
关注公众号
					低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 
							
								
								    上一篇
								    
								
								Spark Master启动源码分析
Spark Master启动源码分析 更多资源 github: https://github.com/opensourceteams/spark-scala-maven csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769 Youtube 视频 Spark master启动源码分析: https://youtu.be/74q1nddoaiY BiliBili 视频 Spark master启动源码分析: https://www.bilibili.com/video/av37442271/ 启动 master 启动脚本 start-master.sh 加载配置文件 . "${SPARK_HOME}/sbin/spark-config.sh" . "${SPARK_HOME}/bin/load-spark-env.sh" 默认配置 SPARK_MASTER_PORT=7077 SPARK_MASTER_IP=`hostname` SPARK_MASTER_WEBUI_PORT=8080 CL...
 - 
							
								
								    下一篇
								    
								
								Spark Master资源调度--worker向master注册
Spark Master资源调度–Worker向Master注册 更多资源 github: https://github.com/opensourceteams/spark-scala-maven csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769 Youtube 视频 youtube (Spark Master资源调度–Worker向Master注册) https://youtu.be/SFqXaIKt-yI Bilibili 视频 bilibili (Spark Master资源调度–Worker向Master注册) https://www.bilibili.com/video/av37442280/ Worker向Master注册 worker发送注册消息(RegisterWorker) override def onStart() { assert(!registered) logInfo("Starting Spark worker %s:%d with %d cores, ...
 
相关文章
文章评论
共有0条评论来说两句吧...

			
				
				
				
				
				
				
				
微信收款码
支付宝收款码