Spark RDD/Core 编程 API入门系列之map、filter、textFile、cache、对Job输出结果进行升和降序、uni...
1、以本地模式实战map和filter 2、以集群模式实战textFile和cache 3、对Job输出结果进行升和降序 4、union 5、groupByKey 6、join 7、reduce 8、lookup 1、以本地模式实战map和filter 以local的方式,运行spark-shell。 spark@SparkSingleNode:~$ cd /usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$ pwd /usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$./spark-shell 从集合中创建RDD,spark中主要提供了两种函数:parallelize和makeRDD, scala>val rdd = sc.parallelize(List(1,2,3,4,5)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21 scala>val mappedRDD = rdd.map(2*_) mappedRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:23 scala>mappedRDD.collect 得到 res0: Array[Int] = Array(2, 4, 6, 8, 10) scala> scala>val filteredRDD = mappedRDD.filter(_ > 4) 16/09/26 20:32:29 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on localhost:40688 in memory (size: 1218.0 B, free: 534.5 MB) 16/09/26 20:32:30 INFO spark.ContextCleaner: Cleaned accumulator 1 filteredRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at filter at <console>:25 scala>filteredRDD.collect 注意,一般,生产环境和正宗的写法是。 scala>val filteredRDDAgain = sc.parallelize(List(1,2,3,4,5)).map(2 * _).filter(_ > 4).collect 2、以集群模式实战textFile和cache 启动hadoop集群 spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$ jps 8457 Jps spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$sbin/start-dfs.sh 启动spark集群 spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6$sbin/start-all.sh spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$./spark-shell --master spark://SparkSingleNode:7077 读取该文件 scala>val rdd = sc.textFile("/README.md") 使用count统计一下该文件的行数 scala> rdd.count took 7.018386 s res0: Long = 98 花了时间7.018386 s 通过观察RDD.scala源代码即可知道cache和persist的区别: def persist(newLevel: StorageLevel): this.type = { if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) { throw new UnsupportedOperationException( "Cannot change storage level of an RDD after it was already assigned a level") } sc.persistRDD(this) sc.cleaner.foreach(_.registerRDDForCleanup(this)) storageLevel = newLevel this } /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def persist(): this.type = persist(StorageLevel.MEMORY_ONLY) /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): this.type = persist() 可知: 1)RDD的cache()方法其实调用的就是persist方法,缓存策略均为MEMORY_ONLY; 2)可以通过persist方法手工设定StorageLevel来满足工程需要的存储级别; 3)cache或者persist并不是action; 附:cache和persist都可以用unpersist来取消 进行缓存 scala>rdd.cache res1: rdd.type = MapPartitionsRDD[1] at textFile at <console>:21 执行count,使得缓存生效 scala>rdd.count took 2.055063 s res2: Long = 98 花了时间2.055063 s 再执行,count took 0.583177 s res3: Long = 98 花了时间0.583177 s 总结,我们直接基于cache缓存后的数据,计算所消耗时间大大减少。 正在进行中的spark-shell 接着,对上面的RDD,进行wordcount操作 scala>val wordcount = rdd.flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_) wordcount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:23 scala>wordcount.collect 通过saveAsTextFile把数据保存起来 res4: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), (Python,2), (cluster.,1), (its,1), ([run,1), (general,2), (have,1), (pre-built,1), (locally.,1), (locally,2), (changed,1), (sc.parallelize(1,1), (only,1), (several,1), (This,2), (basic,1), (Configuration,1), (learning,,1), (documentation,3), (YARN,,1), (graph,1), (Hive,2), (first,1), (["Specifying,1), ("yarn-client",1), (page](http://spark.apache.org/documentation.html),1), ([params]`.,1), (application,1), ([project,2), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (distribution.,1), (are,1), (params,1), (scala>,1), (DataFrames... scala>wordcount.saveAsTextFile("/result") 只是,仅仅对每行,做了wordcount而已。 3、对Job输出结果进行升和降序 升序 scala>val wordcount = rdd.flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_).map(x => (x._2,x._1)).sortByKey(true).map(x => (x._2,x._1)).saveAsTextFile("/resultAscSorted") 同理,去下载,不多赘述。 变了 scala>val wordcount = rdd.flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_).map(x => (x._2,x._1)).sortBy(true).map(x => (x._2,x._1)).saveAsTextFile("/resultAscSorted") <console>:23: error: type mismatch; found : Boolean(true) required: ((Int, String)) => ? val wordcount = rdd.flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_).map(x => (x._2,x._1)).sortBy(true).map(x => (x._2,x._1)).saveAsTextFile("/resultAscSorted") ^ scala> 降序 scala>val wordcount = rdd.flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_).map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).saveAsTextFile("/resultDescSorted") 下载,同理 此刻,成功对Job输出结果进行了排序。 4、union union的使用 scala>val rdd1 = sc.parallelize(List(('a',1),('b',1))) rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[26] at parallelize at <console>:21 scala>val rdd2 = sc.parallelize(List(('c',1),('d',1))) rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[27] at parallelize at <console>:21 scala>rdd1 union rdd2 res6: org.apache.spark.rdd.RDD[(Char, Int)] = UnionRDD[28] at union at <console>:26 scala>val result = rdd1 union rdd2 result: org.apache.spark.rdd.RDD[(Char, Int)] = UnionRDD[29] at union at <console>:25 使用collect操作,查看一下执行结果 scala>result.collect res7: Array[(Char, Int)] = Array((a,1), (b,1), (c,1), (d,1)) 5、groupByKey scala>val wordcount = rdd.flatMap(_.split(' ')).map((_,1)).groupByKey wordcount: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[32] at groupByKey at <console>:23 scala>wordcount.collect res8: Array[(String, Iterable[Int])] = Array((package,CompactBuffer(1)), (this,CompactBuffer(1)), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),CompactBuffer(1)), (Because,CompactBuffer(1)), (Python,CompactBuffer(1, 1)), (cluster.,CompactBuffer(1)), (its,CompactBuffer(1)), ([run,CompactBuffer(1)), (general,CompactBuffer(1, 1)), (YARN,,CompactBuffer(1)), (have,CompactBuffer(1)), (pre-built,CompactBuffer(1)), (locally.,CompactBuffer(1)), (locally,CompactBuffer(1, 1)), (changed,CompactBuffer(1)), (sc.parallelize(1,CompactBuffer(1)), (only,CompactBuffer(1)), (several,CompactBuffer(1)), (learning,,CompactBuffer(1)), (basic,CompactBuffer(1)), (first,CompactBuffer(1)), (This,CompactBuffer(1, 1)), (documentation,CompactBuffer(1, 1, 1)), (Confi... scala> 6、join 概念知识,参考 http://www.cnblogs.com/goforward/p/4748128.html scala>val rdd1 = sc.parallelize(List(('a',1),('a',2),('b',3),('b',4))) rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:21 scala>val rdd2 = sc.parallelize(List(('a',5),('a',6),('b',7),('b',8))) rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[34] at parallelize at <console>:21 scala>rdd1 join rdd2 res9: org.apache.spark.rdd.RDD[(Char, (Int, Int))] = MapPartitionsRDD[37] at join at <console>:26 scala>val result = rdd1 join rdd2 result: org.apache.spark.rdd.RDD[(Char, (Int, Int))] = MapPartitionsRDD[40] at join at <console>:25 scala>result.collect res10: Array[(Char, (Int, Int))] = Array((b,(3,7)), (b,(3,8)), (b,(4,7)), (b,(4,8)), (a,(1,5)), (a,(1,6)), (a,(2,5)), (a,(2,6))) scala> 可见,join操作,完全是一个笛卡尔积的操作。 7、reduce reduce本身啊,在RDD操作里,属于一个action类型的操作,会导致job作业的提交和执行。 scala>val rdd = sc.parallelize(List(1,2,3,4,5)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[41] at parallelize at <console>:21 scala>rdd.reduce(_+_) res11: Int = 15 8、lookup scala>val rdd2 = sc.parallelize(List(('a',5),('a',6),('b',7),('b',8))) rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[42] at parallelize at <console>:21 scala>rdd2.lookup('a') //返回一个seq, (5, 6) 是把a对应的所有元素的value提出来组成一个seq res12: Seq[Int] = WrappedArray(5, 6) 本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/5910869.html,如需转载请自行联系原作者