def loadDataToHive(args:HiveConfig){
val loadPath = args.hdfsPath + args.modelName;
val tb_json_serde =
"json_serde_"
+ args.modelName +
"_table"
;
val tb=
"tb_"
+ args.modelName;
val hiveContext =
new
org.apache.spark.sql.hive.HiveContext(sc)
if
(args.database !=
""
&& args.schema !=
""
) {
print(
"正在创建项目..."
+ args.modelName)
hiveContext.sql(
"CREATE DATABASE IF NOT EXISTS "
+ args.database);
print(
"正在构造扩展模型..."
);
hiveContext.sql(
"CREATE TABLE IF NOT EXISTS "
+ args.database +
"."
+ tb_json_serde +
"("
+ args.schema +
") row format serde 'org.apache.hive.hcatalog.data.JsonSerDe' LOCATION "
+
"'"
+ loadPath +
"/'"
);
println(
"CREATE TABLE IF NOT EXISTS "
+ args.database +
"."
+ tb +
" as select "
+ args.schema_tb +
" from "
+ args.database +
"."
+ tb_json_serde +
" LATERAL VIEW explode("
+ tb_json_serde +
".data) b AS data"
);
hiveContext.sql(
"CREATE TABLE IF NOT EXISTS "
+ args.database +
"."
+ tb +
" as select "
+ args.schema_tb +
" from "
+ args.database +
"."
+ tb_json_serde +
" LATERAL VIEW explode("
+ tb_json_serde +
".data) b AS data"
);
println(args.modelName +
" 扩展模型加载已完成!"
);
}
}
hiveConfigList.size;
hiveConfigList.foreach { x => loadDataToHive(x) };