Spark Master启动源码分析
Spark Master启动源码分析
更多资源
- github: https://github.com/opensourceteams/spark-scala-maven
- csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769
Youtube 视频
- Spark master启动源码分析: https://youtu.be/74q1nddoaiY
BiliBili 视频
- Spark master启动源码分析: https://www.bilibili.com/video/av37442271/
启动 master
启动脚本 start-master.sh
加载配置文件
. "${SPARK_HOME}/sbin/spark-config.sh" . "${SPARK_HOME}/bin/load-spark-env.sh"
默认配置
SPARK_MASTER_PORT=7077 SPARK_MASTER_IP=`hostname` SPARK_MASTER_WEBUI_PORT=8080 CLASS="org.apache.spark.deploy.master.Master"
调用spark-daemon.sh
CLASS="org.apache.spark.deploy.master.Master" ORIGINAL_ARGS="$@" "${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \ --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \ $ORIGINAL_ARGS
spark-daemon.sh
解析spark-daemon.sh
-
调用命令
nohup nice -n “SPARKNICENESS""SPARK_NICENESS" "SPARKNICENESS""{SPARK_HOME}”/bin/spark-class command"command "command"@" >> “KaTeX parse error: Expected 'EOF', got '&' at position 8: log" 2>&̲1 < /dev/null &…!”
;;
- 即 ```shell nohup nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.master.Master "$@" >> "$log" 2>&1 < /dev/null & newpid="$!" ;;
run_command() { mode="$1" shift mkdir -p "$SPARK_PID_DIR" if [ -f "$pid" ]; then TARGET_ID="$(cat "$pid")" if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then echo "$command running as process $TARGET_ID. Stop it first." exit 1 fi fi if [ "$SPARK_MASTER" != "" ]; then echo rsync from "$SPARK_MASTER" rsync -a -e ssh --delete --exclude=.svn --exclude='logs/*' --exclude='contrib/hod/logs/*' "$SPARK_MASTER/" "${SPARK_HOME}" fi spark_rotate_log "$log" echo "starting $command, logging to $log" case "$mode" in (class) nohup nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class $command "$@" >> "$log" 2>&1 < /dev/null & newpid="$!" ;; (submit) nohup nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-submit --class $command "$@" >> "$log" 2>&1 < /dev/null & newpid="$!" ;; (*) echo "unknown mode: $mode" exit 1 ;; esac echo "$newpid" > "$pid" sleep 2 # Check if the process has died; in that case we'll tail the log so the user can see if [[ ! $(ps -p "$newpid" -o comm=) =~ "java" ]]; then echo "failed to launch $command:" tail -2 "$log" | sed 's/^/ /' echo "full log in $log" fi }
option=$1 case $option in (submit) run_command submit "$@" ;; (start) run_command class "$@" ;;
spark-class
加载配置文件
. "${SPARK_HOME}"/bin/load-spark-env.sh
调用Main入口类
# The launcher library will print arguments separated by a NULL character, to allow arguments with # characters that would be otherwise interpreted by the shell. Read that in a while loop, populating # an array that will be used to exec the final command. CMD=() while IFS= read -d '' -r ARG; do CMD+=("$ARG") done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@") exec "${CMD[@]}"
Main类入口
启动Master类命令调用
/** * Usage: Main [class] [class args] * <p> * This CLI works in two different modes: * <ul> * <li>"spark-submit": if <i>class</i> is "org.apache.spark.deploy.SparkSubmit", the * {@link SparkLauncher} class is used to launch a Spark application.</li> * <li>"spark-class": if another class is provided, an internal Spark class is run.</li> * </ul> * * This class works in tandem with the "bin/spark-class" script on Unix-like systems, and * "bin/spark-class2.cmd" batch script on Windows to execute the final command. * <p> * On Unix-like systems, the output is a list of command arguments, separated by the NULL * character. On Windows, the output is a command line suitable for direct execution from the * script. */ public static void main(String[] argsArray) throws Exception { checkArgument(argsArray.length > 0, "Not enough arguments: missing class name."); List<String> args = new ArrayList<String>(Arrays.asList(argsArray)); String className = args.remove(0); boolean printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND")); AbstractCommandBuilder builder; if (className.equals("org.apache.spark.deploy.SparkSubmit")) { try { builder = new SparkSubmitCommandBuilder(args); } catch (IllegalArgumentException e) { printLaunchCommand = false; System.err.println("Error: " + e.getMessage()); System.err.println(); MainClassOptionParser parser = new MainClassOptionParser(); try { parser.parse(args); } catch (Exception ignored) { // Ignore parsing exceptions. } List<String> help = new ArrayList<String>(); if (parser.className != null) { help.add(parser.CLASS); help.add(parser.className); } help.add(parser.USAGE_ERROR); builder = new SparkSubmitCommandBuilder(help); } } else { builder = new SparkClassCommandBuilder(className, args); } Map<String, String> env = new HashMap<String, String>(); List<String> cmd = builder.buildCommand(env); if (printLaunchCommand) { System.err.println("Spark Command: " + join(" ", cmd)); System.err.println("========================================"); } if (isWindows()) { System.out.println(prepareWindowsCommand(cmd, env)); } else { // In bash, use NULL as the arg separator since it cannot be used in an argument. List<String> bashCmd = prepareBashCommand(cmd, env); for (String c : bashCmd) { System.out.print(c); System.out.print('\0'); } } }
Master类
main方法
- 启动 ‘sparkMaster’ 服务
- 给自己发送消息: BoundPortsRequest
def main(argStrings: Array[String]) { Utils.initDaemon(log) val conf = new SparkConf val args = new MasterArguments(argStrings, conf) val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf) rpcEnv.awaitTermination() }
/** * Start the Master and return a three tuple of: * (1) The Master RpcEnv * (2) The web UI bound port * (3) The REST server bound port, if any */ def startRpcEnvAndEndpoint( host: String, port: Int, webUiPort: Int, conf: SparkConf): (RpcEnv, Int, Option[Int]) = { val securityMgr = new SecurityManager(conf) val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr) val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)) val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest) (rpcEnv, portsResponse.webUIPort, portsResponse.restPort) }
onStart方法
- Started MasterWebUI
- 实例化默认的存储引擎,实例化Leader选举
- 默认每分钟检查worker的心跳,未保持连接的worker清除
override def onStart(): Unit = { logInfo("Starting Spark master at " + masterUrl) logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}") webUi = new MasterWebUI(this, webUiPort) webUi.bind() masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(CheckForWorkerTimeOut) } }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS) if (restServerEnabled) { val port = conf.getInt("spark.master.rest.port", 6066) restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl)) } restServerBoundPort = restServer.map(_.start()) masterMetricsSystem.registerSource(masterSource) masterMetricsSystem.start() applicationMetricsSystem.start() // Attach the master and app metrics servlet handler to the web ui after the metrics systems are // started. masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler) applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler) val serializer = new JavaSerializer(conf) val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match { case "ZOOKEEPER" => logInfo("Persisting recovery state to ZooKeeper") val zkFactory = new ZooKeeperRecoveryModeFactory(conf, serializer) (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this)) case "FILESYSTEM" => val fsFactory = new FileSystemRecoveryModeFactory(conf, serializer) (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this)) case "CUSTOM" => val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory")) val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer]) .newInstance(conf, serializer) .asInstanceOf[StandaloneRecoveryModeFactory] (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this)) case _ => (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this)) } persistenceEngine = persistenceEngine_ leaderElectionAgent = leaderElectionAgent_ }
- 接受Worker注册
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RegisterWorker( id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( workerHost, workerPort, cores, Utils.megabytesToString(memory))) if (state == RecoveryState.STANDBY) { context.reply(MasterInStandby) } else if (idToWorker.contains(id)) { context.reply(RegisterWorkerFailed("Duplicate worker ID")) } else { val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, workerRef, workerWebUiUrl) if (registerWorker(worker)) { persistenceEngine.addWorker(worker) context.reply(RegisteredWorker(self, masterWebUiUrl)) schedule() } else { val workerAddress = worker.endpoint.address logWarning("Worker registration failed. Attempted to re-register worker at same " + "address: " + workerAddress) context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress)) } } }
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Spark 通信原理分析
Spark通信原理(发件箱和收件箱) 更多资源 github: https://github.com/opensourceteams/spark-scala-maven csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769 Youtube视频分享 youtube:https://youtu.be/3vUVwbEGf1E BiliBili视频分享 bilibili : https://www.bilibili.com/video/av37442199/ Spark通信说明图 发件箱Outbox 发送消息在LinkList中存储 private val messages = new java.util.LinkedList[OutboxMessage] /** * Send a message. If there is no active connection, cache it and launch a new connection. If * [[Outbox]] is stopped,...
- 下一篇
Spark Worker启动源码分析
Spark Worker启动源码分析 更多资源 github: https://github.com/opensourceteams/spark-scala-maven csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769 Youtube视频分享 youtube: https://youtu.be/ll_Ae6rP7II Bilibili视频分享 bilibili: https://www.bilibili.com/video/av37442247/ start-slave.sh启动脚本 worker启动脚本跟master一样,都调用spark-daemon.sh,只是启动类不一样 CLASS="org.apache.spark.deploy.worker.Worker" spark-daemon.sh start $CLASS Worker main入口 主要源码 启动 ‘sparkWorker’ 的服务 def main(argStrings: Array[String]) { Ut...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS关闭SELinux安全模块
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- SpringBoot2整合Redis,开启缓存,提高访问速度
- MySQL8.0.19开启GTID主从同步CentOS8
- Hadoop3单机部署,实现最简伪集群
- 设置Eclipse缩进为4个空格,增强代码规范
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16