package
com.baidu.loan;
/***
*
* /home/users/ouerqiang/hadoop/hadoop-client-palo/hadoop/bin/hadoop jar LoanIdeaInfoText.jar com.baidu.loan.LoanIdeainfoJoinIterialByDAILI6 /test/fbiz/loan/ideainfo/LoanIdeainfoByDAILIUnitID_0928 /test/fbiz/loan/ideainfo/LoanIterialByDAI_0928 /test/fbiz/loan/ideainfo/LoanIdeainfoJoinIterialByDAILI6_1_0928
*
* **/
import
java.io.IOException;
import
java.util.Iterator;
import
org.apache.hadoop.mapred.FileOutputFormat;
import
org.apache.hadoop.mapred.JobClient;
import
org.apache.hadoop.mapred.JobConf;
import
org.apache.hadoop.mapred.MapReduceBase;
import
org.apache.hadoop.mapred.Mapper;
import
org.apache.hadoop.mapred.OutputCollector;
import
org.apache.hadoop.mapred.Partitioner;
import
org.apache.hadoop.mapred.Reducer;
import
org.apache.hadoop.mapred.Reporter;
import
org.apache.hadoop.mapred.TextInputFormat;
import
org.apache.hadoop.mapred.lib.MultipleInputs;
import
org.apache.hadoop.util.Tool;
import
org.apache.hadoop.util.ToolRunner;
import
org.apache.hadoop.conf.Configured;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.Text;
import
com.baidu.uilt.TextPair;
public
class
LoanIdeainfoJoinIterialByDAILI6
extends
Configured
implements
Tool {
public
static
class
JoinUnitMapper
extends
MapReduceBase
implements
Mapper<LongWritable, Text, TextPair, Text> {
public
void
map(LongWritable key, Text value,
OutputCollector<TextPair, Text> output, Reporter reporter)
throws
IOException {
String gbkStr = value.toString();
if
(gbkStr.split(
"\t"
).length <
2
&& gbkStr.split(
","
).length ==
4
) {
String[] strs = gbkStr.split(
","
);
output.collect(
new
TextPair(strs[
0
],
"0"
), value);
}
}
}
public
static
class
JoinIterialMapper
extends
MapReduceBase
implements
Mapper<LongWritable, Text, TextPair, Text> {
public
void
map(LongWritable key, Text value,
OutputCollector<TextPair, Text> output, Reporter reporter)
throws
IOException {
String gbkStr = value.toString();
if
(gbkStr.split(
"\t"
).length >
4
) {
String[] strs = gbkStr.split(
"\t"
);
output.collect(
new
TextPair(strs[
0
],
"1"
), value);
}
}
}
public
static
class
JoinReducer
extends
MapReduceBase
implements
Reducer<TextPair, Text, Text, Text> {
public
void
reduce(TextPair key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws
IOException {
Text stationName =
new
Text(values.next());
while
(values.hasNext()) {
Text record = values.next();
Text outValue =
new
Text(stationName.toString() +
"\t"
+ record.toString());
output.collect(stationName, record);
}
}
}
public
static
class
KeyPartitioner
implements
Partitioner<TextPair, Text> {
@Override
public
void
configure(JobConf job) {}
@Override
public
int
getPartition(TextPair key, Text value,
int
numPartitions) {
return
(key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
@Override
public
int
run(String[] args)
throws
Exception {
if
(args.length !=
3
) {
return
-
1
;
}
JobConf conf =
new
JobConf(getConf(), getClass());
conf.setJobName(
"Join record with station name"
);
String strPathUnit =args[
0
];
String strPathIterial =args[
1
];
Path outputPath=
new
Path(args[
2
]);
MultipleInputs.addInputPath(conf,
new
Path(strPathUnit),
TextInputFormat.
class
, JoinUnitMapper.
class
);
MultipleInputs.addInputPath(conf,
new
Path(strPathIterial),
TextInputFormat.
class
, JoinIterialMapper.
class
);
FileOutputFormat.setOutputPath(conf, outputPath);
conf.setPartitionerClass(KeyPartitioner.
class
);
conf.setOutputValueGroupingComparator(TextPair.FirstComparator.
class
);
conf.setMapOutputKeyClass(TextPair.
class
);
conf.setReducerClass(JoinReducer.
class
);
conf.setOutputKeyClass(Text.
class
);
JobClient.runJob(conf);
return
0
;
}
public
static
void
main(String[] args)
throws
Exception {
int
exitCode = ToolRunner.run(
new
LoanIdeainfoJoinIterialByDAILI6(), args);
System.exit(exitCode);
}
}