您现在的位置是:首页 > 文章详情

spark2.1.0之源码分析——RPC配置TransportConf

日期:2018-07-01点击:360
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80888076

      在《Spark2.1.0之内置RPC框架》提到TransportContext中的TransportConf给Spark的RPC框架提供配置信息,它有两个成员属性——配置提供者conf和配置的模块名称module。这两个属性的定义如下:

 private final ConfigProvider conf; private final String module;

其中conf是真正的配置提供者,其类型ConfigProvider是一个抽象类,见代码清单1。

代码清单1  ConfigProvider的实现
public abstract class ConfigProvider { public abstract String get(String name); public String get(String name, String defaultValue) { try { return get(name); } catch (NoSuchElementException e) { return defaultValue; } } public int getInt(String name, int defaultValue) { return Integer.parseInt(get(name, Integer.toString(defaultValue))); } public long getLong(String name, long defaultValue) { return Long.parseLong(get(name, Long.toString(defaultValue))); } public double getDouble(String name, double defaultValue) { return Double.parseDouble(get(name, Double.toString(defaultValue))); } public boolean getBoolean(String name, boolean defaultValue) { return Boolean.parseBoolean(get(name, Boolean.toString(defaultValue))); } }

从代码清单1,可以看到ConfigProvider中包括get、getInt、getLong、getDouble、getBoolean等方法,这些方法都是基于抽象方法get获取值,经过一次类型转换而实现。这个抽象的get方法将需要子类去实现。

         Spark通常使用SparkTransportConf创建TransportConf,其实现见代码清单2。

代码清单2  SparkTransportConf的实现
object SparkTransportConf { private val MAX_DEFAULT_NETTY_THREADS = 8 def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 0): TransportConf = { val conf = _conf.clone val numThreads = defaultNumThreads(numUsableCores) conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString) conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString) new TransportConf(module, new ConfigProvider { override def get(name: String): String = conf.get(name) }) } private def defaultNumThreads(numUsableCores: Int): Int = { val availableCores = if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors() math.min(availableCores, MAX_DEFAULT_NETTY_THREADS) } } 

从代码清单2看到,可以使用SparkTransportConf的fromSparkConf方法来构造TransportConf。传递的三个参数分别为SparkConf、模块名module及可用的内核数numUsableCores。如果numUsableCores小于等于0,那么线程数是系统可用处理器的数量,不过系统的内核数不可能全部用于网络传输使用,所以这里还将分配给网络传输的内核数量最多限制在8个。最终确定的线程数将被用于设置客户端传输线程数(spark.$module.io.clientThreads属性)和服务端传输线程数(spark.$module.io.serverThreads属性)。fromSparkConf最终构造TransportConf对象时传递的ConfigProvider为实现了get方法的匿名的内部类,get的实现实际是代理了SparkConf的get方法。


关于《Spark内核设计的艺术 架构设计与实现

经过近一年的准备,基于Spark2.1.0版本的《 Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:


纸质版售卖链接如下:
原文链接:https://yq.aliyun.com/articles/632425
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章