Spark学习之键值对(pair RDD)操作(3)
Spark学习之键值对(pair RDD)操作(3)
1. 我们通常从一个RDD中提取某些字段(如代表事件时间、用户ID或者其他标识符的字段),并使用这些字段为pair RDD操作中的键。
2. 创建pair RDD
1)读取本身就是键值对的数据 2)一个普通的RDD通过map()转为pair RDD,传递的函数需要返回键值对。 Python中使用第一个单词作为键创建出一个pair RDD
pairs = lines.amp(lambda x: (x.split(" ")[0],x))
Scala中使用第一个单词作为键创建出一个pair RDD
val pairs = lines.map(x=>(x.split(" ")(0),x))
3. pair RDD的转化操作
pair RDD可以使用所有标准RDD上的可能的转化操作,还有其他如下 reduceBykey(func) 合并具有相同键的值 groupByke() 对具有相同键的值进行分组 combineByKey( 使用不同的的返回类型合并具有相同键的值 createCombiner, mergeValue, mergCombiners, partitioner) mapValues(func) 对pair RDD中的每个值应用一个函数而不改变键 flatMapValues(func) 对pair RDD中的每个值应用一个返回迭代器的函数, 然后对返回的每个元素都生成一个对应原键的键值对记录。 通常用于符号化。 keys() 返回一个仅包含键的RDD values() 返回一个仅包含值的RDD sortByKey() 返回一个根据键排序的RDD
4. 针对两个pair RDD转化操作
subtractByKey 删掉RDD中键与other RDD中的键相同的元素 join 对两个RDD进行内连接 rightOuterJoin 对两个RDD进行连接操作,确保第一个RDD的键必须存在(右外连接) leftOuterJoin 对两个RDD进行连接操作,确保第二个RDD的键必须存在(左外连接) cogroup 将两个RDD中拥有相同键的数据分组到一起
5. pair RDD的行动操作
countByKey() 对每个键对应的元素分别计数 collectAsMap() 将结果以映射表的形式返回,以便查询 lookup(key) 返回给定键对应的所有值
6. 数据分区
控制数据分布以获得最少的网络传输可以极大地提升整体性能。 只有当数据集多次在诸如连这种基于键的操作中使用时,分区才有帮助。 Scala自定义分区方式
val sc = new SparkContext(...) val userData = sc.sequenceFile(UserID,UserInfo)("hdfs://...") .partitionBy(new HashPartitioner(100)) .persist()

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Spark学习之RDD编程(2)
Spark学习之RDD编程(2) 1. Spark中的RDD是一个不可变的分布式对象集合。 2. 在Spark中数据的操作不外乎创建RDD、转化已有的RDD以及调用RDD操作进行求值。 3. 创建RDD:1)读取一个外部数据集2)在驱动器程序里分发驱动器程序中的对象集合。 4. RDD支持的操作: 1)转换操作,由一个RDD生成一个新的RDD。 2)行动操作,对RDD进行计算结果,并把结果返回到驱动器程序中,或者把结果存储到外部存储系统(如HDFS)。 5. Spark程序或者shell会话都会按如下方式工作: 1)从外部数据创建出输入RDD。 2)使用诸如filter()这样的转化操作对RDD进行转化,以定义一个新的RDD。 3)告诉Spark对需要被重用的中间结果RDD执行persist()操作。 4)使用行动操作 (例如count()和first()等)来触发一次并行计算,Spark会对计算进行优化后在执行。 6. 创建RDD 快速创建RDD,把程序中一个已有的集合传给SparkContext的parallelize()方法,不过这种方法除了开发原型和测试时,这种方式用的并不多。...
- 下一篇
Spark学习之数据读取与保存(4)
Spark学习之数据读取与保存(4) 1. 文件格式 Spark对很多种文件格式的读取和保存方式都很简单。 如文本文件的非结构化的文件,如JSON的半结构化文件,如SequenceFile结构化文件。通过扩展名进行处理。 2. 读取/保存文本文件 Python中读取一个文本文件 input = sc.textfile("file:///home/holen/repos/spark/README.md") Scala中读取一个文本文件 val input = sc.textFile("file:///home/holen/repos/spark/README.md") Java中读取一个文本文件 JavaRDD<String> input = sc.textFile("file:///home/holen/repos/spark/README.md") saveAsTextFile()方法用了保存为文本文件 3. 读取/保存JSON文件 Python中读取JSON文件 import json data = input.map(lambda x: json.loads(x)) ...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS8安装Docker,最新的服务器搭配容器使用
- 设置Eclipse缩进为4个空格,增强代码规范
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2全家桶,快速入门学习开发网站教程
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- MySQL8.0.19开启GTID主从同步CentOS8