Spark学习之编程进阶——累加器与广播(5)
Spark学习之编程进阶——累加器与广播(5)
1. Spark中两种类型的共享变量:累加器(accumulator)与广播变量(broadcast variable)。累加器对信息进行聚合,而广播变量用来高效分发较大的对象。
2. 共享变量是一种可以在Spark任务中使用的特殊类型的变量。
3. 累加器的用法:
- 通过在驱动器中调用SparkContext.accumulator(initialValue)方法,创建出存有初始值的累加器。返回值为org.apache.spark.Accumlator[T]对象,其中T是初始值initialValue的类型。
- Spark闭包里的执行器代码可以使用累加器的+=方法(在Java中是add)增加累加器的值。
-
驱动器程序可以调用累加器的value属性(在Java中使用value()或setValue()来访问累加器的值。
Python中实现累加空行
file = sc.textFile(inputFile) #创建Accumulator[Int]并初始化为0 blankLines = sc.accumulator(0) def extractCallSigns(Line): globle blankLines #访问全局变量 if (line == ""): blankLines += 1 return line.split("") callSigns = file.flatMap(extractCallSigns) callSigns.saveAsTextFile(outputDir + "/callsigns") print "Blank lines:%d" % blankLines.value
4. Spark的广播变量,它可以让程序高效地向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。
Scala代码使用广播变量查询国家
//查询RDD contactCounts中的呼号的对应位置。将呼号前缀 //读取为国家代码进行查询 val signPrefixes = sc.broadcast(loadCallSignTable()) val countryContactCounts = contactCounts.map{case (sign,count) => val country = lookupInArray(sign,signPrefixes.value) (country,count) }.reduceByKey((x,y) => x+y) countryContactCounts.saveAsTextFile(outputDir + "/countries.text")
5. Spark在RDD上提供pipe()方法。Spark的pipe()方法可以让我们使用任意一种语言实现Spark作业中的部分逻辑,只要它的读写Unix标准流就行。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Hadoop平台配置汇总
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/50524657 Hadoop平台配置汇总 @(Hadoop) Hadoop hadoop-env.sh和yarn-env.sh中export log和pid的dir即可和JAVA_HOME。 core-site.xml <configuration> <property> <name>fs.defaultFS</name> <value>hdfs://ns1</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/data2/hadoop/tmp</value> </property> <property> <name>ha.zookeeper.quorum</name&...
- 下一篇
Storm的数据处理编程单元:Bolt 学习整理
Bolt是Topology中的数据处理的单元,也是Storm针对处理过程的编程单元。Topology中所有的处理都是在这些Bolt中完成的,编程人员可以实现自定义的处理过程,例如,过滤、函数、聚集、连接等计算。如果是复杂的计算过程,往往需要多个步骤和使用多个Bolt。 Bolt可以将数据项发送至多个数据流(Stream)。编程人员首先可以使用OutputFieldsDeclarer类的declareStream()方法来声明多个流,指定数据将要发送到的流,然后使用SpoutOutputCollector的emit方法将数据发送。 当声明了一个Bolt的输入流后,可以从其他的组件中接收这些指定的流。当接收某个组件的所有流时,需要在程序中逐个声明接收的过程。InputDeclarer对象默认接收来自某组件默认的流。 //从名称为"1"的组件中接收默认的流。 declarer.shuffleGrouping("1") IBolt 和 IComponent接口 IBolt接口: //在组件的任务初被初始化时,由集群中的工作进程(worker)调用,prepare()用于实例化Bolt的已给运...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2全家桶,快速入门学习开发网站教程
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- 设置Eclipse缩进为4个空格,增强代码规范
- MySQL8.0.19开启GTID主从同步CentOS8