package
com.cn.ctripotb;
import
org.apache.spark.SparkConf;
import
org.apache.spark.api.java.JavaSparkContext;
import
org.apache.spark.sql.DataFrame;
import
org.apache.spark.sql.hive.HiveContext;
import
java.util.*;
import
java.util.concurrent.Callable;
import
java.util.concurrent.Executors;
/**
* Created by Administrator on 2016/9/12.
*/
public
class
HotelTest {
static
ResourceBundle rb = ResourceBundle.getBundle(
"filepath"
);
public
static
void
main(String[] args) {
SparkConf conf =
new
SparkConf()
.setAppName(
"MultiJobWithThread"
)
.set(
"spark.serializer"
,
"org.apache.spark.serializer.KryoSerializer"
);
JavaSparkContext sc =
new
JavaSparkContext(conf);
HiveContext hiveContext =
new
HiveContext(sc.sc());
final
DataFrame df = getHotelInfo(hiveContext);
df.rdd().saveAsTextFile(rb.getString(
"hdfspath"
) +
"/file1"
,com.hadoop.compression.lzo.LzopCodec.
class
);
df.rdd().saveAsTextFile(rb.getString(
"hdfspath"
) +
"/file2"
,com.hadoop.compression.lzo.LzopCodec.
class
);
java.util.concurrent.ExecutorService executorService = Executors.newFixedThreadPool(
2
);
executorService.submit(
new
Callable<Void>() {
@Override
public
Void call(){
df.rdd().saveAsTextFile(rb.getString(
"hdfspath"
) +
"/file3"
,com.hadoop.compression.lzo.LzopCodec.
class
);
return
null
;
}
});
executorService.submit(
new
Callable<Void>() {
@Override
public
Void call(){
df.rdd().saveAsTextFile(rb.getString(
"hdfspath"
) +
"/file4"
,com.hadoop.compression.lzo.LzopCodec.
class
);
return
null
;
}
});
executorService.shutdown();
}
public
static
DataFrame getHotelInfo(HiveContext hiveContext){
String sql =
"select * from common.dict_hotel_ol"
;
return
hiveContext.sql(sql);
}
}