SparkContext的初始化(叔篇)——TaskScheduler的启动
《深入理解Spark:核心思想与源码分析》一书前言的内容请看链接《深入理解SPARK:核心思想与源码分析》一书正式出版上市
《深入理解Spark:核心思想与源码分析》一书第一章的内容请看链接《第1章 环境准备》
《深入理解Spark:核心思想与源码分析》一书第二章的内容请看链接《第2章 SPARK设计理念与基本架构》
由于本书的第3章内容较多,所以打算分别开辟四篇随笔分别展现。
《深入理解Spark:核心思想与源码分析》一书第三章第一部分的内容请看链接《深入理解Spark:核心思想与源码分析》——SparkContext的初始化(伯篇)》
《深入理解Spark:核心思想与源码分析》一书第三章第一部分的内容请看链接《深入理解Spark:核心思想与源码分析》——SparkContext的初始化(仲篇)》
本文展现第3章第三部分的内容:
3.8 TaskScheduler的启动
3.7节介绍了任务调度器TaskScheduler的创建,要想TaskScheduler发挥作用,必须要启动它,代码如下。
taskScheduler.start() TaskScheduler在启动的时候,实际调用了backend的start方法。 override def start() { backend.start() }
以LocalBackend为例,启动LocalBackend时向actorSystem注册了LocalActor,见代码清单3-30所示(在《深入理解Spark:核心思想与源码分析》——SparkContext的初始化(仲篇)》一文中)。
3.8.1 创建LocalActor
创建LocalActor的过程主要是构建本地的Executor,见代码清单3-36。
代码清单3-36 LocalActor的实现
private[spark] class LocalActor(scheduler: TaskSchedulerImpl, executorBackend: LocalBackend, private val totalCores: Int) extends Actor with ActorLogReceive with Logging { import context.dispatcher // to use Akka's scheduler.scheduleOnce() private var freeCores = totalCores private val localExecutorId = SparkContext.DRIVER_IDENTIFIER private val localExecutorHostname = "localhost" val executor = new Executor( localExecutorId, localExecutorHostname, scheduler.conf.getAll, totalCores, isLocal = true) override def receiveWithLogging = { case ReviveOffers => reviveOffers() case StatusUpdate(taskId, state, serializedData) => scheduler.statusUpdate(taskId, state, serializedData) if (TaskState.isFinished(state)) { freeCores += scheduler.CPUS_PER_TASK reviveOffers() } case KillTask(taskId, interruptThread) => executor.killTask(taskId, interruptThread) case StopExecutor => executor.stop() } }
Executor的构建,见代码清单3-37,主要包括以下步骤:
1) 创建并注册ExecutorSource。ExecutorSource是做什么的呢?笔者将在3.10.2节详细介绍。
2) 获取SparkEnv。如果是非local模式,Worker上的CoarseGrainedExecutorBackend向Driver上的CoarseGrainedExecutorBackend注册Executor时,则需要新建SparkEnv。可以修改属性spark.executor.port(默认为0,表示随机生成)来配置Executor中的ActorSystem的端口号。
3) 创建并注册ExecutorActor。ExecutorActor负责接受发送给Executor的消息。
4) urlClassLoader的创建。为什么需要创建这个ClassLoader?在非local模式中,Driver或者Worker上都会有多个Executor,每个Executor都设置自身的urlClassLoader,用于加载任务上传的jar包中的类,有效对任务的类加载环境进行隔离。
5) 创建Executor执行TaskRunner任务(TaskRunner将在5.5节介绍)的线程池。此线程池是通过调用Utils.newDaemonCachedThreadPool创建的,具体实现请参阅附录A。
6) 启动Executor的心跳线程。此线程用于向Driver发送心跳。
此外,还包括Akka发送消息的帧大小(10485760字节)、结果总大小的字节限制(1073741824字节)、正在运行的task的列表、设置serializer的默认ClassLoader为创建的ClassLoader等。
代码清单3-37 Executor的构建
val executorSource = new ExecutorSource(this, executorId) private val env = { if (!isLocal) { val port = conf.getInt("spark.executor.port", 0) val _env = SparkEnv.createExecutorEnv( conf, executorId, executorHostname, port, numCores, isLocal, actorSystem) SparkEnv.set(_env) _env.metricsSystem.registerSource(executorSource) _env.blockManager.initialize(conf.getAppId) _env } else { SparkEnv.get } } private val executorActor = env.actorSystem.actorOf( Props(new ExecutorActor(executorId)), "ExecutorActor") private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) env.serializer.setDefaultClassLoader(urlClassLoader) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) private val maxResultSize = Utils.getMaxResultSize(conf) val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker") private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] startDriverHeartbeater()
3.8.2 ExecutorSource的创建与注册
ExecutorSource用于测量系统。通过metricRegistry的register方法注册计量,这些计量信息包括threadpool.activeTasks、threadpool.completeTasks、threadpool.currentPool_size、threadpool.maxPool_size、filesystem.hdfs.write_bytes、filesystem.hdfs.read_ops、filesystem.file.write_bytes、filesystem.hdfs.largeRead_ops、filesystem.hdfs.write_ops等,ExecutorSource的实现见代码清单3-38。Metric接口的具体实现,参考附录D。
代码清单3-38 ExecutorSource的实现
private[spark] class ExecutorSource(val executor: Executor, executorId: String) extends Source { private def fileStats(scheme: String) : Option[FileSystem.Statistics] = FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOption private def registerFileSystemStat[T]( scheme: String, name: String, f: FileSystem.Statistics => T, defaultValue: T) = { metricRegistry.register(MetricRegistry.name("filesystem", scheme, name), new Gauge[T] { override def getValue: T = fileStats(scheme).map(f).getOrElse(defaultValue) }) } override val metricRegistry = new MetricRegistry() override val sourceName = "executor" metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] { override def getValue: Int = executor.threadPool.getActiveCount() }) metricRegistry.register(MetricRegistry.name("threadpool", "completeTasks"), new Gauge[Long] { override def getValue: Long = executor.threadPool.getCompletedTaskCount() }) metricRegistry.register(MetricRegistry.name("threadpool", "currentPool_size"), new Gauge[Int] { override def getValue: Int = executor.threadPool.getPoolSize() }) metricRegistry.register(MetricRegistry.name("threadpool", "maxPool_size"), new Gauge[Int] { override def getValue: Int = executor.threadPool.getMaximumPoolSize() }) // Gauge for file system stats of this executor for (scheme <- Array("hdfs", "file")) { registerFileSystemStat(scheme, "read_bytes", _.getBytesRead(), 0L) registerFileSystemStat(scheme, "write_bytes", _.getBytesWritten(), 0L) registerFileSystemStat(scheme, "read_ops", _.getReadOps(), 0) registerFileSystemStat(scheme, "largeRead_ops", _.getLargeReadOps(), 0) registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0) } }
创建完ExecutorSource后,调用MetricsSystem的registerSource方法将ExecutorSource注册到MetricsSystem。registerSource方法使用MetricRegistry的register方法,将Source注册到MetricRegistry,见代码清单3-39。关于MetricRegistry,具体参阅附录D。
代码清单3-39 MetricsSystem注册Source的实现
def registerSource(source: Source) { sources += source try { val regName = buildRegistryName(source) registry.register(regName, source.metricRegistry) } catch { case e: IllegalArgumentException => logInfo("Metrics already registered", e) } }
3.8.3 ExecutorActor的构建与注册
ExecutorActor很简单,当接收到SparkUI发来的消息时,将所有线程的栈信息发送回去,代码实现如下。
override def receiveWithLogging = { case TriggerThreadDump => sender ! Utils.getThreadDump() }
3.8.4 Spark自身ClassLoader的创建
获取要创建的ClassLoader的父加载器currentLoader,然后根据currentJars生成URL数组,spark.files.userClassPathFirst属性指定加载类时是否先从用户的classpath下加载,最后创建ExecutorURLClassLoader或者ChildExecutorURLClassLoader,见代码清单3-40。
代码清单3-40 Spark自身ClassLoader的创建
private def createClassLoader(): MutableURLClassLoader = { val currentLoader = Utils.getContextOrSparkClassLoader val urls = currentJars.keySet.map { uri => new File(uri.split("/").last).toURI.toURL }.toArray val userClassPathFirst = conf.getBoolean("spark.files.userClassPathFirst", false) userClassPathFirst match { case true => new ChildExecutorURLClassLoader(urls, currentLoader) case false => new ExecutorURLClassLoader(urls, currentLoader) } }
Utils.getContextOrSparkClassLoader的实现见附录A。ExecutorURLClassLoader或者ChildExecutorURLClassLoader实际上都继承了URLClassLoader,见代码清单3-41。
代码清单3-41 ChildExecutorURLClassLoader与ExecutorURLClassLoader的实现
private[spark] class ChildExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader) extends MutableURLClassLoader { private object userClassLoader extends URLClassLoader(urls, null){ override def addURL(url: URL) { super.addURL(url) } override def findClass(name: String): Class[_] = { super.findClass(name) } } private val parentClassLoader = new ParentClassLoader(parent) override def findClass(name: String): Class[_] = { try { userClassLoader.findClass(name) } catch { case e: ClassNotFoundException => { parentClassLoader.loadClass(name) } } } def addURL(url: URL) { userClassLoader.addURL(url) } def getURLs() = { userClassLoader.getURLs() } } private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader) extends URLClassLoader(urls, parent) with MutableURLClassLoader { override def addURL(url: URL) { super.addURL(url) } }
如果需要REPL交互,还会调用addReplClassLoaderIfNeeded创建replClassLoader,见代码清单3-42。
代码清单3-42 addReplClassLoaderIfNeeded的实现
private def addReplClassLoaderIfNeeded(parent: ClassLoader): ClassLoader = { val classUri = conf.get("spark.repl.class.uri", null) if (classUri != null) { logInfo("Using REPL class URI: " + classUri) val userClassPathFirst: java.lang.Boolean = conf.getBoolean("spark.files.userClassPathFirst", false) try { val klass = Class.forName("org.apache.spark.repl.ExecutorClassLoader") .asInstanceOf[Class[_ <: ClassLoader]] val constructor = klass.getConstructor(classOf[SparkConf], classOf[String], classOf[ClassLoader], classOf[Boolean]) constructor.newInstance(conf, classUri, parent, userClassPathFirst) } catch { case _: ClassNotFoundException => logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!") System.exit(1) null } } else { parent } }
3.8.5 启动Executor的心跳线程
Executor的心跳由startDriverHeartbeater启动,见代码清单3-43。Executor心跳线程的间隔由属性spark.executor.heartbeatInterval配置,默认是10000毫秒。此外,超时时间是30秒,超时重试次数是3次,重试间隔是3000毫秒,使用actorSystem.actorSelection (url)方法查找到匹配的Actor引用, url是akka.tcp://sparkDriver@ $driverHost:$driverPort/user/HeartbeatReceiver,最终创建一个运行过程中,每次会休眠10000到20000毫秒的线程。此线程从runningTasks获取最新的有关Task的测量信息,将其与executorId、blockManagerId封装为Heartbeat消息,向HeartbeatReceiver发送Heartbeat消息。
代码清单3-43 启动Executor的心跳线程
def startDriverHeartbeater() { val interval = conf.getInt("spark.executor.heartbeatInterval", 10000) val timeout = AkkaUtils.lookupTimeout(conf) val retryAttempts = AkkaUtils.numRetries(conf) val retryIntervalMs = AkkaUtils.retryWaitMs(conf) val heartbeatReceiverRef = AkkaUtils.makeDriverRef("HeartbeatReceiver", conf,env.actorSystem) val t = new Thread() { override def run() { // Sleep a random interval so the heartbeats don't end up in sync Thread.sleep(interval + (math.random * interval).asInstanceOf[Int]) while (!isStopped) { val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]() val curGCTime = gcTime for (taskRunner <- runningTasks.values()) { if (!taskRunner.attemptedTask.isEmpty) { Option(taskRunner.task).flatMap(_.metrics).foreach { metrics => metrics.updateShuffleReadMetrics metrics.jvmGCTime = curGCTime - taskRunner.startGCTime if (isLocal) { val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics)) tasksMetrics += ((taskRunner.taskId, copiedMetrics)) } else { // It will be copied by serialization tasksMetrics += ((taskRunner.taskId, metrics)) } } } } val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId) try { val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef, retryAttempts, retryIntervalMs, timeout) if (response.reregisterBlockManager) { logWarning("Told to re-register on heartbeat") env.blockManager.reregister() } } catch { case NonFatal(t) => logWarning("Issue communicating with driver in heartbeater", t) } Thread.sleep(interval) } } } t.setDaemon(true) t.setName("Driver Heartbeater") t.start() }
这个心跳线程的作用是什么呢?其作用有两个:
更新正在处理的任务的测量信息;
通知BlockManagerMaster,此Executor上的BlockManager依然活着。
下面对心跳线程的实现详细分析下,读者可以自行选择是否需要阅读。
初始化TaskSchedulerImpl后会创建心跳接收器HeartbeatReceiver。HeartbeatReceiver接受所有分配给当前Driver Application的Executor的心跳,并将Task、Task计量信息、心跳等交给TaskSchedulerImpl和DAGScheduler作进一步处理。创建心跳接收器的代码如下。
private val heartbeatReceiver = env.actorSystem.actorOf( Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
HeartbeatReceiver在收到心跳消息后,会调用TaskScheduler的executorHeartbeatReceived方法,代码如下。
override def receiveWithLogging = { case Heartbeat(executorId, taskMetrics, blockManagerId) => val response = HeartbeatResponse( !scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId)) sender ! response }
executorHeartbeatReceived的实现代码如下。
val metricsWithStageIds: Array[(Long, Int, Int, TaskMetrics)] = synchronized { taskMetrics.flatMap { case (id, metrics) => taskIdToTaskSetId.get(id) .flatMap(activeTaskSets.get) .map(taskSetMgr => (id, taskSetMgr.stageId, taskSetMgr.taskSet.attempt, metrics)) } } dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId)
这段程序通过遍历taskMetrics,依据taskIdToTaskSetId和activeTaskSets找到TaskSetManager。然后将taskId、TaskSetManager.stageId、TaskSetManager .taskSet.attempt、TaskMetrics封装到Array[(Long, Int, Int, TaskMetrics)]的数组metricsWithStageIds中。最后调用了dagScheduler的executorHeartbeatReceived方法,其实现如下。
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics)) implicit val timeout = Timeout(600 seconds) Await.result( blockManagerMaster.driverActor ? BlockManagerHeartbeat(blockManagerId), timeout.duration).asInstanceOf[Boolean]
dagScheduler将executorId、metricsWithStageIds封装为SparkListenerExecutorMetricsUpdate事件,并post到listenerBus中,此事件用于更新Stage的各种测量数据。最后给BlockManagerMaster持有的BlockManagerMasterActor发送BlockManagerHeartbeat消息。BlockManagerMasterActor在收到消息后会匹配执行heartbeatReceived方法(会在4.3.1节介绍)。heartbeatReceived最终更新BlockManagerMaster对BlockManager最后可见时间(即更新BlockManagerId对应的BlockManagerInfo的_lastSeenMs,见代码清单3-44)。
代码清单3-44 BlockManagerMasterActor的心跳处理
private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = { if (!blockManagerInfo.contains(blockManagerId)) { blockManagerId.isDriver && !isLocal } else { blockManagerInfo(blockManagerId).updateLastSeenMs() true } }
local模式下Executor的心跳通信过程,可以用图3-3来表示。
图3-3 Executor的心跳通信过程
注意:在非local模式中Executor发送心跳的过程是一样的,主要的区别是Executor进程与Driver不在同一个进程,甚至不在同一个节点上。
接下来会初始化块管理器BlockManager,代码如下。
env.blockManager.initialize(applicationId)
具体的初始化过程,请参阅第4章。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Hadoop2.7实战v1.0之动态添加DataNode和NodeManager节点(不修改dfs.replication)
Hadoop2.7实战v1.0之动态添加DataNode和NodeManager节点(不修改dfs.replication)【终极版】 0.添加大文件到hdfs系统中去,是为了后期的动态添加datanode的机器,datanode机器的数据量不均衡,为了模拟均衡实验 点击(此处)折叠或打开 [root@sht-sgmhadoopnn-01 ~]# hadoop fs -put /root/oracle-j2sdk1.7-1.7.0+update67-1.x86_64.rpm /testdir [root@sht-sgmhadoopnn-01 ~]# hadoop fs -put /root/012_HDFS.avi /testdir [root@sht-sgmhadoopnn-01 ~]# hadoop fs -put /root/016_Hadoop.avi /testdir ###3个datanode节点数据量和数据块相等 1.修改系统hostname(通过hostname和/etc/sysconfig/network进行修改) 点击(此处)折叠或打开 [root@sht-sgmh...
- 下一篇
Cloudera Certified Administrator for Apache Hadoop(CCAH认证)
Exam Sections and Blueprint 1. HDFS (17%) Describe the function of HDFS daemons Describe the normal operation of an Apache Hadoop cluster, both in data storage and in data processing Identify current features of computing systems that motivate a system like Apache Hadoop Classify major goals of HDFS Design Given a scenario, identify appropriate use case for HDFS Federation Identify components and daemon of an HDFS HA-Quorum cluster Analyze the role of HDFS security (Kerberos) Determine the best ...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
-
Docker使用Oracle官方镜像安装(12C,18C,19C)
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8编译安装MySQL8.0.19
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
推荐阅读
最新文章
- CentOS7,CentOS8安装Elasticsearch6.8.6
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- CentOS6,CentOS7官方镜像安装Oracle11G
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- 设置Eclipse缩进为4个空格,增强代码规范
- Mario游戏-低调大师作品
- MySQL8.0.19开启GTID主从同步CentOS8
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果