您现在的位置是:首页 > 文章详情

SparkSubmit object分析

日期:2021-04-22点击:501

org.apache.spark.deploy.SparkSubmit分析

  • Object SparkSubmit类图分析

    SparkSubmit object分析
    结合spark的log4j来说,printStream为error,log4j的配置也是System.error,导致做日志收集的时候莫名其妙。。。。
    解决方案如下,采用重新构造log4j的appender:

    Logger.getRootLogger.removeAllAppenders() Logger.getRootLogger.setLevel(Level.INFO) val console = new ConsoleAppender() console.setTarget("System.out") val patternLayout = new PatternLayout() patternLayout.setConversionPattern("%c{1}: %m%n") console.setLayout(patternLayout) console.setEncoding("UTF-8") console.setName("console") console.activateOptions() Logger.getRootLogger.addAppender(console)
  • main方法执行

    1. 创建org.apache.spark.deploy.SparkSubmit类,重写parseArguments、logInfo、logWarning、doSubmit方法,执行doSubmit方法。
    2. 调用duSubmit方法,实际调用org.apache.spark.deploy.SparkSubmit类的submit对象。
  • 执行doSubmit方法

    1. 初始化log4j,查找classloader中org/apache/spark/log4j-defaults.properties,如果存在,PropertyConfigurator.configure("org/apache/spark/log4j-defaults.properties")。如果不在输出Spark was unable to load org/apache/spark/log4j-defaults.properties
    2. 解析提交参数,返回SparkSubmitArguments对象,具体实现方式如图:
      SparkSubmit object分析
    3. 如果verbose=true,输出参数信息
    4. 通过action,执行不同的启动方法,这里以SparkSubmitAction.SUBMIT为例。
    5. 执行submit方法。首先判断master.startsWith("spark://") && deployMode == "cluster",这里我们应该是false,执行doRun方法。查看是否设置了proxyUser,如果是设置hdfs的security(一些大数据平台修改了底层认证,如tbds、fi等,应该相对应在这里的源码做了处理)。执行runMain方法。
  • runMain方法

    1. 执行prepareSubmitEnvironment
    2. 设置classloader,如果设置了spark.driver.userClassPathFirst=true,创建出ChildFirstURLClassLoader,否则创建出MutableURLClassLoader。
    3. 把childClasspath加载到当前的classloader
    4. 利用反射初始化mainclass,根据mainclass的类型创建出不同的SparkApplication,scala.App的是JavaMainApplication,其他的则用SparkApplication
    5. 执行application start()方法。
  • 执行prepareSubmitEnvironment

    1. 创建SparkConf!!!,将system的spark开头的env参数加载。
    2. 根据args.master设置clusterManager=1(YARN),根据args.deployMode,如果没有填,默认是client,如果填了cluster,默认是cluster。如果master写了yarn-cluster,默认为yarn,cluster,测试org.apache.spark.deploy.yarn.YarnClusterApplication是否可以被加载。默认情况下可能找不到这个类,因为默认没有加入到maven编译内,尴尬,只能手动加入!!!
    3. args.sparkProperties加入到sparkConf
    4. 创建org.apache.hadoop.conf.Configuration,将sparkConf中以spark.hadoop.开头的参数截取,添加到configuration中,设置默认的io.file.buffer.size65536
    5. 创建临时目录,地址在System.getProperty("java.io.tmpdir"),对于linux系统一般为/tmp目录。目录名称为spark-#{UUID},添加ShutdownHookManager,当程序结束时清理目录。
    6. 设置principal,keytab。在sparkconf中存入,并且在hdfs登录。
    7. 如果deployMode是client,移动--jars,--files,--pyFiles,--archives到第5步创建好的目录内,并且将这些参数的地址修改为临时目录下的真是地址。
    8. 以下就是添加相关参数。
    9. childMainClass如果是client模式,最终为我们所写的main函数,如果是cluster模式,最终为org.apache.spark.deploy.yarn.YarnClusterApplication
  • sparkconf内容变化

    1. 加载系统配置,加载spark.开头的参数
    2. set命令行加载的参数
    3. 设置spark.yarn.keytab,spark.yarn.principal参数
    4. 加载SparkSubmit496-555行的参数,参数的具体值根据命令行设置
    5. 重复2操作,设置不存在的命令行参数??why??
    6. 将附件做文件路径解析后的路径set
  • childClasspath变化过程

    1. 如果是client模式,把提交的jar和--jars的jar加入classpath
    2. 如果是yarn模式且deploy为cluster时,把提交的jar和--jars的jar加入classpath
  • childArgs变化

    1. 如果是client模式,把提交命令行的args参数加入到childArgs
    2. 把所有的OptionAssigner加入childArgs
    3. 如果是yarn-cluster模式,加入--class参数,--jar参数,把提交命令行的args参数加入到childArgs
原文链接:https://blog.51cto.com/u_5530261/2724843
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章