import
java.util.Arrays;
import
java.util.Iterator;
import
java.util.List;
import
org.apache.spark.SparkConf;
import
org.apache.spark.api.java.JavaPairRDD;
import
org.apache.spark.api.java.JavaRDD;
import
org.apache.spark.api.java.JavaSparkContext;
import
org.apache.spark.api.java.function.FlatMapFunction;
import
org.apache.spark.api.java.function.Function;
import
org.apache.spark.api.java.function.Function2;
import
org.apache.spark.api.java.function.PairFunction;
import
org.apache.spark.api.java.function.VoidFunction;
import
com.google.common.base.Optional;
import
scala.Tuple2;
public
class
Demo01 {
public
static
void
main(String[] args) {
SparkConf conf =
new
SparkConf().setAppName(
"Demo01"
).setMaster(
"local"
);
JavaSparkContext jsc =
new
JavaSparkContext(conf);
leftOutJoin(jsc);
jsc.stop();
}
private
static
void
map(JavaSparkContext jsc) {
List<Integer> lst = Arrays.asList(
1
,
2
,
3
,
4
,
5
,
6
,
7
,
8
);
JavaRDD<Integer> numRDD = jsc.parallelize(lst);
JavaRDD<Integer> resultRDD = numRDD.map(
new
Function<Integer, Integer>() {
private
static
final
long
serialVersionUID = 1L;
@Override
public
Integer call(Integer num)
throws
Exception {
return
num *
2
;
}
});
resultRDD.foreach(
new
VoidFunction<Integer>() {
private
static
final
long
serialVersionUID = 1L;
@Override
public
void
call(Integer num)
throws
Exception {
System.out.println(num);
}
});
}
private
static
void
filter(JavaSparkContext jsc) {
List<Integer> lst = Arrays.asList(
1
,
2
,
3
,
4
,
5
,
6
,
7
,
8
);
JavaRDD<Integer> numRDD = jsc.parallelize(lst);
System.out.println(numRDD.filter(
new
Function<Integer, Boolean>() {
private
static
final
long
serialVersionUID = 1L;
@Override
public
Boolean call(Integer num)
throws
Exception {
return
num %
2
==
0
;
}
}).collect());
}
private
static
void
flatMap(JavaSparkContext jsc) {
List<String> lst = Arrays.asList(
"hi tim "
,
"hello girl"
,
"hello spark"
);
JavaRDD<String> lines = jsc.parallelize(lst);
JavaRDD<String> resultRDD = lines.flatMap(
new
FlatMapFunction<String, String>() {
private
static
final
long
serialVersionUID = 1L;
@Override
public
Iterable<String> call(String line)
throws
Exception {
return
Arrays.asList(line.split(
" "
));
}
});
System.out.println(resultRDD.collect());
}
private
static
void
groupByKey(JavaSparkContext jsc) {
@SuppressWarnings
(
"unchecked"
)
List<Tuple2<String, Integer>> lst = Arrays.asList(
new
Tuple2<String, Integer>(
"class01"
,
100
),
new
Tuple2<String, Integer>(
"class02"
,
101
),
new
Tuple2<String, Integer>(
"class01"
,
199
),
new
Tuple2<String, Integer>(
"class02"
,
121
),
new
Tuple2<String, Integer>(
"class02"
,
120
));
JavaPairRDD<String, Integer> classRDD = jsc.parallelizePairs(lst);
JavaPairRDD<String, Iterable<Integer>> groupedRDD = classRDD.groupByKey();
groupedRDD.foreach(
new
VoidFunction<Tuple2<String,Iterable<Integer>>>() {
private
static
final
long
serialVersionUID = 1L;
@Override
public
void
call(Tuple2<String, Iterable<Integer>> tuple)
throws
Exception {
String classKey = tuple._1;
Iterator<Integer> values = tuple._2.iterator();
while
(values.hasNext()) {
Integer value = values.next();
System.out.println(
"key:"
+ classKey +
"\t"
+
"value:"
+ value);
}
}
});
}
private
static
void
reduceByKey(JavaSparkContext jsc) {
@SuppressWarnings
(
"unchecked"
)
List<Tuple2<String, Integer>> lst = Arrays.asList(
new
Tuple2<String, Integer>(
"class01"
,
100
),
new
Tuple2<String, Integer>(
"class02"
,
101
),
new
Tuple2<String, Integer>(
"class01"
,
199
),
new
Tuple2<String, Integer>(
"class02"
,
121
),
new
Tuple2<String, Integer>(
"class02"
,
120
));
JavaPairRDD<String, Integer> classRDD = jsc.parallelizePairs(lst);
JavaPairRDD<String, Integer> resultRDD = classRDD.reduceByKey(
new
Function2<Integer, Integer, Integer>() {
private
static
final
long
serialVersionUID = 1L;
@Override
public
Integer call(Integer v1, Integer v2)
throws
Exception {
return
v1 + v2;
}
});
resultRDD.foreach(
new
VoidFunction<Tuple2<String,Integer>>() {
private
static
final
long
serialVersionUID = 1L;
@Override
public
void
call(Tuple2<String, Integer> tuple)
throws
Exception {
System.out.println(
"key:"
+ tuple._1 +
"\t"
+
"value:"
+ tuple._2);
}
});
}
private
static
void
sortByKey(JavaSparkContext jsc) {
@SuppressWarnings
(
"unchecked"
)
List<Tuple2<String, Integer>> lst = Arrays.asList(
new
Tuple2<String, Integer>(
"tom"
,
60
),
new
Tuple2<String, Integer>(
"kate"
,
80
),
new
Tuple2<String, Integer>(
"kobe"
,
100
),
new
Tuple2<String, Integer>(
"马蓉"
,
4
),
new
Tuple2<String, Integer>(
"宋哲"
,
2
),
new
Tuple2<String, Integer>(
"白百合"
,
3
),
new
Tuple2<String, Integer>(
"隔壁老王"
,
1
));
JavaPairRDD<String, Integer> classRDD = jsc.parallelizePairs(lst);
JavaPairRDD<Integer, String> pairRDD = classRDD.mapToPair(
new
PairFunction<Tuple2<String,Integer>,Integer , String>() {
private
static
final
long
serialVersionUID = 1L;
@Override
public
Tuple2<Integer, String> call(Tuple2<String, Integer> tuple)
throws
Exception {
return
new
Tuple2<Integer, String>(tuple._2, tuple._1);
}
});
JavaPairRDD<Integer, String> sortedRDD = pairRDD.sortByKey();
JavaPairRDD<String, Integer> sortedRDD01 = sortedRDD.mapToPair(
new
PairFunction<Tuple2<Integer,String>, String, Integer>() {
private
static
final
long
serialVersionUID = 1L;
@Override
public
Tuple2<String, Integer> call(Tuple2<Integer, String> tuple)
throws
Exception {
return
new
Tuple2<String, Integer>(tuple._2, tuple._1);
}
} );
List<Tuple2<String, Integer>> result = sortedRDD01.take(
3
);
System.out.println(result);
}
private
static
void
join(JavaSparkContext jsc) {
@SuppressWarnings
(
"unchecked"
)
List<Tuple2<Integer, String>> names =Arrays.asList(
new
Tuple2<Integer, String>(
1
,
"jack"
),
new
Tuple2<Integer, String>(
2
,
"rose"
),
new
Tuple2<Integer, String>(
3
,
"tom"
),
new
Tuple2<Integer, String>(
4
,
"赵丽颖"
));
JavaPairRDD<Integer, String> num2NamesRDD = jsc.parallelizePairs(names);
List<Tuple2<Integer, Integer>> scores = Arrays.asList(
new
Tuple2<Integer, Integer>(
1
,
60
),
new
Tuple2<Integer, Integer>(
4
,
100
),
new
Tuple2<Integer, Integer>(
2
,
30
));
JavaPairRDD<Integer, Integer> num2scoresRDD = jsc.parallelizePairs(scores);
JavaPairRDD<Integer, Tuple2<Integer, String>> joinedRDD = num2scoresRDD.join(num2NamesRDD);
JavaPairRDD<Integer, String> score2NameRDD = joinedRDD.mapToPair(
new
PairFunction<Tuple2<Integer,Tuple2<Integer,String>>,Integer, String>() {
private
static
final
long
serialVersionUID = 1L;
@Override
public
Tuple2<Integer, String> call(
Tuple2<Integer, Tuple2<Integer, String>> tuple)
throws
Exception {
Integer score = tuple._2._1;
String name = tuple._2._2;
return
new
Tuple2<Integer, String>(score,name);
}
});
System.out.println(score2NameRDD.sortByKey(
false
).take(
2
));
}
private
static
void
leftOutJoin(JavaSparkContext jsc) {
@SuppressWarnings
(
"unchecked"
)
List<Tuple2<Integer, String>> names =Arrays.asList(
new
Tuple2<Integer, String>(
1
,
"jack"
),
new
Tuple2<Integer, String>(
2
,
"rose"
),
new
Tuple2<Integer, String>(
3
,
"tom"
),
new
Tuple2<Integer, String>(
4
,
"赵丽颖"
));
JavaPairRDD<Integer, String> num2NamesRDD = jsc.parallelizePairs(names);
List<Tuple2<Integer, Integer>> scores = Arrays.asList(
new
Tuple2<Integer, Integer>(
1
,
60
),
new
Tuple2<Integer, Integer>(
4
,
100
),
new
Tuple2<Integer, Integer>(
2
,
30
));
JavaPairRDD<Integer, Integer> num2scoresRDD = jsc.parallelizePairs(scores);
JavaPairRDD<Integer, Tuple2<String, Optional<Integer>>> joinedRDD = num2NamesRDD.leftOuterJoin(num2scoresRDD);
JavaPairRDD<Integer, String> pairRDD = joinedRDD.mapToPair(
new
PairFunction<Tuple2<Integer,Tuple2<String,Optional<Integer>>>, Integer, String>() {
private
static
final
long
serialVersionUID = 1L;
@Override
public
Tuple2<Integer, String> call(
Tuple2<Integer, Tuple2<String, Optional<Integer>>> tuple)
throws
Exception {
String name = tuple._2._1;
Optional<Integer> scoreOptional = tuple._2._2;
Integer score =
null
;
if
(scoreOptional.isPresent()){
score= scoreOptional.get();
}
else
{
score =
0
;
}
return
new
Tuple2<Integer, String>(score, name);
}
});
JavaPairRDD<Integer, String> sortedRDD = pairRDD.sortByKey(
false
);
sortedRDD.foreach(
new
VoidFunction<Tuple2<Integer,String>>() {
private
static
final
long
serialVersionUID = 1L;
@Override
public
void
call(Tuple2<Integer, String> tuple)
throws
Exception {
if
(tuple._1 ==
0
){
System.out.println(
"name:"
+ tuple._2 +
"\t"
+
"要努力了,你的成绩0分"
);
}
else
{
System.out.println(
"姓名:"
+ tuple._2 +
"\t"
+
"分数:"
+ tuple._1);
}
}
});
}
}