Spark学习之RDD简单算子
collect 返回RDD的所有元素 scala>varinput=sc.parallelize(Array(-1,0,1,2,2)) input:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[15]atparallelizeat<console>:27 scala>varresult=input.collect result:Array[Int]=Array(-1,0,1,2,2) count,coutByValue count返回RDD的元素数量,countByValue返回每个值的出现次数 scala>varinput=sc.parallelize(Array(-1,0,1,2,2)) scala>varresult=input.count result:Long=5 scala>varresult=input.countByValue result:scala.collection.Map[Int,Long]=Map(0->1,1->1,2->2,-1->1) take,top,takeOrdered take返回RDD的前N个元素 takeOrdered默认返回升序排序的前N个元素,可以指定排序算法 Top返回降序排序的前N个元素 varinput=sc.parallelize(Array(1,2,3,4,9,8,7,5,6)) scala>varresult=input.take(6) result:Array[Int]=Array(1,2,3,4,9,8) scala>varresult=input.take(20) result:Array[Int]=Array(1,2,3,4,9,8,7,5,6) scala>varresult=input.takeOrdered(6) result:Array[Int]=Array(1,2,3,4,5,6) scala>varresult=input.takeOrdered(6)(Ordering[Int].reverse) result:Array[Int]=Array(9,8,7,6,5,4) scala>varresult=input.top(6) result:Array[Int]=Array(9,8,7,6,5,4 ) Filter 传入返回值为boolean的函数,返回改函数结果为true的RDD scala>varinput=sc.parallelize(Array(-1,0,1,2)) scala>varresult=input.filter(_>0).collect() result:Array[Int]=Array(1,2) map,flatmap map对每个元素执行函数,转换为新的RDD,flatMap和map类似,但会把map的返回结果做flat处理,就是把多个Seq的结果拼接成一个Seq输出 scala>varinput=sc.parallelize(Array(-1,0,1,2)) scala>varresult=input.map(_+1).collect result:Array[Int]=Array(0,1,2,3) scala>varresult=input.map(x=>x.to(3)).collect result:Array[scala.collection.immutable.Range.Inclusive]=Array(Range(-1,0,1,2,3),Range(0,1,2,3),Range(1,2,3),Range(2,3)) scala>varresult=input.flatMap(x=>x.to(3)).collect result:Array[Int]=Array(-1,0,1,2,3,0,1,2,3,1,2,3,2,3) distinct RDD去重 scala>varinput=sc.parallelize(Array(-1,0,1,2,2)) scala>varresult=input.distinct.collect result:Array[Int]=Array(0,1,2,-1) Reduce 通过函数聚集RDD中的所有元素 scala>varinput=sc.parallelize(Array(-1,0,1,2)) scala>varresult=input.reduce((x,y)=>{println(x,y);x+y}) (-1,1)//处理-1,1,结果为0,RDD剩余元素为{0,2} (0,2)//上面的结果为0,在处理0,2,结果为2,RDD剩余元素为{0} (2,0)//上面结果为2,再处理(2,0),结果为2,RDD剩余元素为{} result:Int=2 sample,takeSample sample就是从RDD中抽样,第一个参数withReplacement是指是否有放回的抽样,true为放回,为false为不放回,放回就是抽样结果可能重复,第二个参数是fraction,0到1之间的小数,表明抽样的百分比 takeSample类似,但返回类型是Array,第一个参数是withReplacement,第二个参数是样本个数 varrdd=sc.parallelize(1to20) scala>rdd.sample(true,0.5).collect res33:Array[Int]=Array(6,8,13,15,17,17,17,18,20) scala>rdd.sample(false,0.5).collect res35:Array[Int]=Array(1,3,10,11,12,13,14,17,18) scala>rdd.sample(true,1).collect res44:Array[Int]=Array(2,2,3,5,6,6,8,9,9,10,10,10,14,15,16,17,17,18,19,19,20,20) scala>rdd.sample(false,1).collect res46:Array[Int]=Array(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20) scala>rdd.takeSample(true,3) res1:Array[Int]=Array(1,15,19) scala>rdd.takeSample(false,3) res2:Array[Int]=Array(7,16,6) collectAsMap,countByKey,lookup collectAsMap把PairRDD转为Map,如果存在相同的key,后面的会覆盖前面的。 countByKey统计每个key出现的次数 Lookup返回给定key的所有value scala>varinput=sc.parallelize(List((1,"1"),(1,"one"),(2,"two"),(3,"three"),(4,"four"))) scala>varresult=input.collectAsMap result:scala.collection.Map[Int,String]=Map(2->two,4->four,1->one,3->three) scala>varresult=input.countByKey result:scala.collection.Map[Int,Long]=Map(1->2,2->1,3->1,4->1) scala>varresult=input.lookup(1) result:Seq[String]=WrappedArray(1,one) scala>varresult=input.lookup(2) result:Seq[String]=WrappedArray(two) groupBy,keyBy groupBy根据传入的函数产生的key,形成元素为K-V形式的RDD,然后对key相同的元素分组 keyBy对每个value,为它加上key scala>varrdd=sc.parallelize(List("A1","A2","B1","B2","C")) scala>varresult=rdd.groupBy(_.substring(0,1)).collect result:Array[(String,Iterable[String])]=Array((A,CompactBuffer(A1,A2)),(B,CompactBuffer(B1,B2)),(C,CompactBuffer(C))) scala>varrdd=sc.parallelize(List("hello","world","spark","is","fun")) scala>varresult=rdd.keyBy(_.length).collect result:Array[(Int,String)]=Array((5,hello),(5,world),(5,spark),(2,is),(3,fun)) keys,values scala>varinput=sc.parallelize(List((1,"1"),(1,"one"),(2,"two"),(3,"three"),(4,"four"))) scala>varresult=input.keys.collect result:Array[Int]=Array(1,1,2,3,4) scala>varresult=input.values.collect result:Array[String]=Array(1,one,two,three,four) mapvalues mapvalues对K-V形式的RDD的每个Value进行操作 scala>varinput=sc.parallelize(List((1,"1"),(1,"one"),(2,"two"),(3,"three"),(4,"four"))) scala>varresult=input.mapValues(_*2).collect result:Array[(Int,String)]=Array((1,11),(1,oneone),(2,twotwo),(3,threethree),(4,fourfour)) union,intersection,subtract,cartesian union合并2个集合,不去重 subtract将第一个集合中的同时存在于第二个集合的元素去掉 intersection返回2个集合的交集 cartesian返回2个集合的笛卡儿积 scala>varrdd1=sc.parallelize(Array(-1,1,1,2,3)) scala>varrdd2=sc.parallelize(Array(0,1,2,3,4)) scala>varresult=rdd1.union(rdd2).collect result:Array[Int]=Array(-1,1,1,2,3,0,1,2,3,4) scala>varresult=rdd1.intersection(rdd2).collect result:Array[Int]=Array(1,2,3) scala>varresult=rdd1.subtract(rdd2).collect result:Array[Int]=Array(-1) scala>varresult=rdd1.cartesian(rdd2).collect result:Array[(Int,Int)]=Array((-1,0),(-1,1),(-1,2),(-1,3),(-1,4),(1,0),(1,1),(1,2),(1,3),(1,4),(1,0),(1,1),(1,2),(1,3),(1,4),(2,0),(2,1),(2,2),(2,3),(2,4),(3,0),(3,1),(3,2),(3,3),(3,4)) 本文作者:Endless2010 来源:51CTO