Spark Worker启动源码分析
Spark Worker启动源码分析
更多资源
- github: https://github.com/opensourceteams/spark-scala-maven
- csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769
Youtube视频分享
- youtube: https://youtu.be/ll_Ae6rP7II
Bilibili视频分享
start-slave.sh启动脚本
- worker启动脚本跟master一样,都调用spark-daemon.sh,只是启动类不一样
CLASS="org.apache.spark.deploy.worker.Worker" spark-daemon.sh start $CLASS
Worker main入口
主要源码
- 启动 ‘sparkWorker’ 的服务
def main(argStrings: Array[String]) { Utils.initDaemon(log) val conf = new SparkConf val args = new WorkerArguments(argStrings, conf) val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores, args.memory, args.masters, args.workDir, conf = conf) rpcEnv.awaitTermination() } def startRpcEnvAndEndpoint( host: String, port: Int, webUiPort: Int, cores: Int, memory: Int, masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None, conf: SparkConf = new SparkConf): RpcEnv = { // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("") val securityMgr = new SecurityManager(conf) val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr) val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_)) rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory, masterAddresses, systemName, ENDPOINT_NAME, workDir, conf, securityMgr)) rpcEnv }
Worker onStart方法调用
WorkUI
- 启动WorkerUI
向所有master注册
- 线程池中每个master单独一个线程,向master注册worker
- worker通过 masterEndpoint.ask向master发送注册worker消息 : RegisterWorker
- master 接收到消息(RegisterWorker)处理后,回应worker消息 : RegisteredWorker
- worker收到RegisteredWorker消息后,进行 registered = true,和刷新内存中的master信息
override def onStart() { assert(!registered) logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format( host, port, cores, Utils.megabytesToString(memory))) logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}") logInfo("Spark home: " + sparkHome) createWorkDir() shuffleService.startIfEnabled() webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() val scheme = if (webUi.sslOptions.enabled) "https" else "http" workerWebUiUrl = s"$scheme://$publicAddress:${webUi.boundPort}" registerWithMaster() metricsSystem.registerSource(workerSource) metricsSystem.start() // Attach the worker metrics servlet handler to the web ui after the metrics system is started. metricsSystem.getServletHandlers.foreach(webUi.attachHandler) }
private def registerWithMaster() { // onDisconnected may be triggered multiple times, so don't attempt registration // if there are outstanding registration attempts scheduled. registrationRetryTimer match { case None => registered = false registerMasterFutures = tryRegisterAllMasters() connectionAttemptCount = 0 registrationRetryTimer = Some(forwordMessageScheduler.scxheduleAtFixedRate( new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { Option(self).foreach(_.send(ReregisterWithMaster)) } }, INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS, INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS, TimeUnit.SECONDS)) case Some(_) => logInfo("Not spawning another attempt to register with the master, since there is an" + " attempt scheduled already.") } } private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = { masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker( workerId, host, port, self, cores, memory, workerWebUiUrl)) .onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => Utils.tryLogNonFatalError { handleRegisterResponse(msg) } case Failure(e) => logError(s"Cannot register with master: ${masterEndpoint.address}", e) System.exit(1) }(ThreadUtils.sameThread) } private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized { msg match { case RegisteredWorker(masterRef, masterWebUiUrl) => logInfo("Successfully registered with master " + masterRef.address.toSparkURL) registered = true changeMaster(masterRef, masterWebUiUrl) forwordMessageScheduler.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(SendHeartbeat) } }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS) if (CLEANUP_ENABLED) { logInfo( s"Worker cleanup enabled; old application directories will be deleted in: $workDir") forwordMessageScheduler.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(WorkDirCleanup) } }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS) } case RegisterWorkerFailed(message) => if (!registered) { logError("Worker registration failed: " + message) System.exit(1) } case MasterInStandby => // Ignore. Master not yet ready. } }
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Spark Master启动源码分析
Spark Master启动源码分析 更多资源 github: https://github.com/opensourceteams/spark-scala-maven csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769 Youtube 视频 Spark master启动源码分析: https://youtu.be/74q1nddoaiY BiliBili 视频 Spark master启动源码分析: https://www.bilibili.com/video/av37442271/ 启动 master 启动脚本 start-master.sh 加载配置文件 . "${SPARK_HOME}/sbin/spark-config.sh" . "${SPARK_HOME}/bin/load-spark-env.sh" 默认配置 SPARK_MASTER_PORT=7077 SPARK_MASTER_IP=`hostname` SPARK_MASTER_WEBUI_PORT=8080 CL...
- 下一篇
Spark Master资源调度--worker向master注册
Spark Master资源调度–Worker向Master注册 更多资源 github: https://github.com/opensourceteams/spark-scala-maven csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769 Youtube 视频 youtube (Spark Master资源调度–Worker向Master注册) https://youtu.be/SFqXaIKt-yI Bilibili 视频 bilibili (Spark Master资源调度–Worker向Master注册) https://www.bilibili.com/video/av37442280/ Worker向Master注册 worker发送注册消息(RegisterWorker) override def onStart() { assert(!registered) logInfo("Starting Spark worker %s:%d with %d cores, ...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
-
Docker使用Oracle官方镜像安装(12C,18C,19C)
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8编译安装MySQL8.0.19
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
推荐阅读
最新文章
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- MySQL8.0.19开启GTID主从同步CentOS8
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Windows10,CentOS7,CentOS8安装Nodejs环境
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS7设置SWAP分区,小内存服务器的救世主