[Spark]Spark RDD 指南三 弹性分布式数据集(RDD)
Spark2.3.0 版本: Spark2.3.0 创建RDD
Spark的核心概念是弹性分布式数据集(RDD),RDD是一个可容错、可并行操作的分布式元素集合。有两种方法可以创建RDD对象:
- 在驱动程序中并行化操作集合对象来创建RDD
- 从外部存储系统中引用数据集(如:共享文件系统、HDFS、HBase或者其他Hadoop支持的数据源)。
1. 并行化集合
通过在驱动程序中的现有集合上调用JavaSparkContext
的parallelize
方法创建并行化集合(Parallelized collections)。集合的元素被复制以形成可以并行操作的分布式数据集。 例如,下面是如何创建一个包含数字1到5的并行化集合:
Java版本:
List<Integer> list = Arrays.asList(1,2,3,4,5); JavaRDD<Integer> rdd = sc.parallelize(list);
Scala版本:
val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data)
Python版本:
data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)
RDD一旦创建,分布式数据集(distData)可以并行操作。 例如,我们可以调用distData.reduce((a,b) - > a + b)
来实现对列表元素求和。 我们稍后介绍分布式数据集的操作。
并行化集合的一个重要参数是分区(partition)数,将分布式数据集分割成多少分区。Spark集群中每个分区运行一个任务(task)。典型场景下,一般为每个CPU分配2-4个分区。但通常而言,Spark会根据你集群的状况,自动设置分区数。当然,你可以给parallelize
方法传递第二个参数来手动设置分区数(如:sc.parallelize(data, 10)
)。注意:Spark代码里有些地方仍然使用分片(slice)这个术语(分区的同义词),主要为了保持向后兼容。
2. 外部数据集
Spark可以从Hadoop支持的任何存储数据源创建分布式数据集,包括本地文件系统,HDFS,Cassandra,HBase,Amazon S3等。Spark可以支持文本文件,SequenceFiles以及任何其他Hadoop 输入格式。
文本文件RDD可以使用SparkContext
的textFile
方法创建。该方法根据URL获取文件(机器上的本地路径,或hdfs://
,s3n://
等等). 下面是一个示例调用:
Java版本:
JavaRDD<String> distFile = sc.textFile("data.txt");
Scala版本:
scala> val distFile = sc.textFile("data.txt") distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
Python版本:
>>> distFile = sc.textFile("data.txt")
一旦创建完成,distFiile 就能做数据集操作。例如,我们可以用下面的方式使用 map 和 reduce 操作将所有行的长度相加:
distFile.map(s -> s.length()).reduce((a, b) -> a + b);
Spark读文件时一些注意事项:
(1) 如果使用本地文件系统路径,在所有工作节点上该文件必须都能用相同的路径访问到。要么复制文件到所有的工作节点,要么使用网络的方式共享文件系统。
(2) Spark 所有基于文件的输入方法,包括 textFile
,能很好地支持文件目录,压缩过的文件和通配符。例如,你可以使用:
textFile("/my/directory") textFile("/my/directory/*.txt") textFile("/my/directory/*.gz")
(3) textFile
方法也可以选择第二个可选参数来控制文件分区(partitions)数目,默认情况下,Spark为每一个文件块创建一个分区(HDFS中分块大小默认为128MB),你也可以通过传递一个较大数值来请求更多分区。 注意的是,分区数目不能少于分块数目。
除了文本文件,Spark的Java API还支持其他几种数据格式:
(1) JavaSparkContext.wholeTextFiles可以读取包含多个小文本文件的目录,并将它们以(文件名,内容)键值对返回。 这与textFile相反,textFile将在每个文件中每行返回一条记录。
JavaPairRDD<String, String> rdd = sc.wholeTextFiles("/home/xiaosi/wholeText"); List<Tuple2<String, String>> list = rdd.collect(); for (Tuple2<?, ?> tuple : list) { System.out.println(tuple._1() + ": " + tuple._2()); }
(2) 对于SequenceFiles,可以使用SparkContext的sequenceFile [K,V]方法,其中K和V是文件中的键和值的类型。 这些应该是Hadoop的Writable接口的子类,如IntWritable和Text。
(3) 对于其他Hadoop InputFormats,您可以使用JavaSparkContext.hadoopRDD方法,该方法采用任意JobConf和输入格式类,键类和值类。 将这些设置与使用输入源的Hadoop作业相同。 您还可以使用基于“新”MapReduce API(org.apache.hadoop.mapreduce)的InputFormats的JavaSparkContext.newAPIHadoopRDD。
(4) JavaRDD.saveAsObjectFile 和 SparkContext.objectFile 支持保存一个RDD,保存格式是一个简单的 Java 对象序列化格式。这是一种效率不高的专有格式,如 Avro,它提供了简单的方法来保存任何一个 RDD。
原文:http://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
MaxCompute(原ODPS)开发入门指南——数据开发工具篇
MaxCompute(原ODPS)开发入门指南——数据开发工具篇 写在最前面 >>>进入了解更多>>>阿里云数加·MaxCompute大数据计算服务. 大家在使用大数据计算服务MaxCompute时,最头疼就是我现在已有的数据如何快速上云?我的日志数据如何采集到MaxCompute上?等等。。。具体详见《MaxCompute(原ODPS)开发入门指南——数据上云篇》。 但是数据在MaxCompute上了之后,问题又来了,我怎么基于上面进行快速的数据开发,构建大数据仓库。本文就重点为大家推荐和介绍开发工具:① 大数据开发套件Data IDE; ② MaxCompute Studio。 大数据开发套件Data IDE 大数据开发套件基于MaxCompute强大的计算存储能力,提供多人协作开发能力且支持百万级别任
- 下一篇
Hive内置运算函数,自定义函数(UDF)和Transform
4.Hive函数 4.1 内置运算符 内容较多,见《Hive官方文档》 4.2 内置函数 内容较多,见《Hive官方文档》 https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF 测试各种内置函数的快捷方法: 1、创建一个dual表 create table dual(id string); 2、load一个文件(一行,一个空格)到dual表 hive> load data local inpath'/home/tuzq/software/hivedata/dual.txt' into table dual; 其中dual.txt里面只是一个空格 3、select substr('angelababy',2,3) from dual; 4.3 Hive自定义函数和Transform 当Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数(UDF:user-defined function)。 4.3.1 自定义函数类别 UDF 作用于单个数据行,产生一个数据行作为...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
-
Docker使用Oracle官方镜像安装(12C,18C,19C)
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8编译安装MySQL8.0.19
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
推荐阅读
最新文章
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS7,CentOS8安装Elasticsearch6.8.6
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- CentOS6,CentOS7官方镜像安装Oracle11G
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- 设置Eclipse缩进为4个空格,增强代码规范
- Mario游戏-低调大师作品
- MySQL8.0.19开启GTID主从同步CentOS8