spark2.1.0之配置与源码分析
任何优秀的软件或服务都会提供一些配置参数,这些配置参数有些是内置的,有些则是可以由用户配置的。对于熟悉Java的开发人员来说,对JVM进行性能调优是一个经常需要面对的工作,这个过程常常伴随着各种JVM参数的调整与测试。之所以将这些参数交给具体的开发人员去调整,是因为软件或者服务的提供者也无法保证给定的默认参数是最符合用户应用场景与软硬件环境的。一个简单的例子:当用户的QPS发生变化时,对于Web服务的JVM来说也应当相应调整内存的大小或限制。
Spark作为一款优秀的计算框架,也配备了各种各样的系统配置参数(例如:spark.master,spark.app.name,spark.driver.memory,spark.executor.memory等)。通过这些配置参数可以定义应用的名称、使用的部署模式、调度模式、executor数量、executor的内核数、driver或executor的内存大小、采用的内存模型等。
SparkConf是Spark的配置类,这个类在Spark的历史版本中已经存在很久了,Spark中的每一个组件都直接或者间接的使用着它所存储的属性,这些属性都存储在如下的数据结构中:
private val settings = new ConcurrentHashMap[String, String]()
由以上代码的泛型[1] 可以看出Spark的所有配置,无论是key还是value都是String类型。Spark的配置通过以下三种方式获取:
- 来源于系统参数(即使用System.getProperties获取的属性)中以spark.作为前缀的那部分属性;
- 使用SparkConf的API进行设置;
- 从其它SparkConf中克隆。
下面将具体说明这三种方式的实现。
系统属性中的配置
在SparkConf中有一个Boolean类型的构造器属性loadDefaults,当loadDefaults为true时将会从系统属性中加载Spark配置,代码如下:
if (loadDefaults) { loadFromSystemProperties(false) } private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = { // Load any spark.* system properties for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) { set(key, value, silent) } this }
以上代码调用了Utils工具类[2] 的getSystemProperties方法,其作用为获取系统的键值对属性。loadFromSystemProperties方法在获取了系统属性后,使用Scala守卫过滤出其中以“spark.”字符串为前缀的key和value并且调用set方法(见代码清单3-1)最终设置到settings中。
代码清单3-1 SparkConf中set方法的实现
private[spark] def set(key:String, value: String, silent: Boolean): SparkConf = { if (key == null) { throw newNullPointerException("nullkey") } if (value == null) { throw newNullPointerException("nullvalue for " + key) } if (!silent) { logDeprecationWarning(key) } settings.put(key,value) this }
使用SparkConf配置的API
给SparkConf添加配置的一种常见方式是使用SparkConf中提供的API。其中有些API最终实际调用了set的重载方法,见代码清单3-2。
代码清单3-2 SparkConf中重载的set方法
def set(key:String, value: String): SparkConf = { set(key,value, false) }
可以看到代码清单3-2中的set方法实际也是调用了代码清单3-1中的set方法。
SparkConf中的setMaster、setAppName、setJars、setExecutorEnv、setSparkHome、setAll等方法最终都是通过代码清单3-2中的set方法完成Spark配置的,本书以其中最为常用的setMaster和setAppName为例,用代码清单3-3和代码清单3-4来展示他们的实现。
代码清单3-3 设置Spark的部署模式的配置方法setMaster
def setMaster(master: String): SparkConf = { set("spark.master", master) }
代码清单3-4 设置Spark的应用名称的配置方法setAppName
def setAppName(name: String): SparkConf = { set("spark.app.name", name) }
克隆SparkConf配置
有些情况下,同一个SparkConf实例中的配置信息需要被Spark中的多个组件共用,例如:组件A中存在一个SparkConf实例a,组件B中也很需要实例a中的配置信息,这时该如何处理?我们往往首先想到的方法是将SparkConf实例定义为全局变量或者通过参数传递给其它组件,但是这会引入并发问题。虽然settings是线程安全的ConcurrentHashMap类,而且ConcurrentHashMap也被证明是高并发下性能表现不错的数据结构,但是只要存在并发就一定会有性能的损失问题。我们可以新建一个SparkConf实例b,并将a中的配置信息全部拷贝到b中,这种方式显然不是最优雅的,复制代码会散落在程序的各个角落。现在是时候阅读下SparkConf的构造器了,代码如下所示:
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging withSerializable { //省略无关代码 def this() = this(true)
SparkConf继承了Cloneable特质并实现了clone方法,clone方法(见代码清单3-5)的实现跟我们所讨论的方式是一样的,并且通过Cloneable特质提高了代码的可复用性。
代码清单3-5 克隆SparkConf配置
override def clone: SparkConf ={ val cloned = new SparkConf(false) settings.entrySet().asScala.foreach { e => cloned.set(e.getKey(),e.getValue(), true) } cloned }
这样我们就可以在任何想要使用SparkConf的地方使用克隆方式来优雅的编程了。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
流计算独享模式正式邀测
流计算 - 不止于流 流计算&独享模式 阿里云流计算(Alibaba Cloud StreamCompute,Powered by Blink)是一个一站式、高性能、稳定、易用的流式大数据处理平台,通过它,您可以快速搭建具有亿级QPS处理能力的流式大数据系统。 近期流计算发布了一种新的售卖模式:独享模式。独享模式中,同时开放了一些针对数据湖场景的新功能: ETL - 数据清洗,数据同步 数据分析 数据湖 Data Lake并不是一个新的概念,早在2011年就被提出,作为数仓的补充。 数据湖是一个中心化的存储,能够存储任意规模的结构化与半结构化数据。数据湖中的数据,不必经过结构化过程,即可进行各种类型的分析,如可视化,大数据处理,实时数据分析和机器学习。 数仓&数据湖 与数仓相比,数据湖中数据有以
- 下一篇
HBase数据库相关基本知识
HBase数据库相关知识 1、 HBase相关概念模型 l 表(table),与关系型数据库一样就是有行和列的表 l 行(row),在表里数据按行存储、行由行键(rowkey)唯一标识,没有数据类型统一为byte[]数组 l 列族(column family),行里的数据按列族分组String类型,每个表必须至少有一个列族 l 列限定符(column qualifier),列族里的数据用列限定符定位。类似关系型数据库里面的列,不必事前定义,没有数据类型同样是byte[]数组 l 单元(cell),单元值我们真实存储的数据(value),没有数据类型byte[]数组 l 时间版本(version),单元值有时间版本,时间版本用时间戳标识long类型(默认保留3个数据版本) 2、 数据存储模型 Hbase集群数据存储底层使用HDFS保障数据
相关文章
文章评论
共有0条评论来说两句吧...