package
StationPatitioner;
import
java.io.IOException;
import
java.util.Iterator;
import
org.apache.hadoop.conf.Configured;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.NullWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapred.FileInputFormat;
import
org.apache.hadoop.mapred.FileOutputFormat;
import
org.apache.hadoop.mapred.JobClient;
import
org.apache.hadoop.mapred.JobConf;
import
org.apache.hadoop.mapred.MapReduceBase;
import
org.apache.hadoop.mapred.Mapper;
import
org.apache.hadoop.mapred.OutputCollector;
import
org.apache.hadoop.mapred.Reducer;
import
org.apache.hadoop.mapred.Reporter;
import
org.apache.hadoop.mapred.TextOutputFormat;
import
org.apache.hadoop.mapred.lib.MultipleOutputs;
import
org.apache.hadoop.mapred.lib.NullOutputFormat;
import
org.apache.hadoop.util.Tool;
import
org.apache.hadoop.util.ToolRunner;
/**
* hadoop Version 1.1.2
* MultipleOutputs例子
* @author 巧克力黑
*
*/
public
class
PatitionByStationUsingMultipleOutputs
extends
Configured
implements
Tool {
enum
Counter
{
LINESKIP,
}
static
class
StationMapper
extends
MapReduceBase
implements
Mapper<LongWritable , Text, Text , Text>{
private
NcdcRecordParser parser =
new
NcdcRecordParser();
@Override
public
void
map(LongWritable key, Text value,
OutputCollector<Text, Text> output, Reporter reporter)
throws
IOException {
try
{
parser.parse(value);
output.collect(
new
Text(parser.getStationid()), value);
}
catch
(Exception e) {
reporter.getCounter(Counter.LINESKIP).increment(
1
);
}
}
}
static
class
MultipleOutputReducer
extends
MapReduceBase
implements
Reducer<Text, Text, NullWritable, Text>{
private
MultipleOutputs multipleOutputs;
@Override
public
void
configure(JobConf jobconf) {
multipleOutputs =
new
MultipleOutputs(jobconf);
}
@Override
public
void
reduce(Text key, Iterator<Text> values,
OutputCollector<NullWritable, Text> output, Reporter reporter)
throws
IOException {
OutputCollector collector = multipleOutputs.getCollector(
"station"
, key.toString().replace(
"-"
,
""
), reporter);
while
(values.hasNext()){
collector.collect(NullWritable.get(), values.next());
}
}
@Override
public
void
close()
throws
IOException {
multipleOutputs.close();
}
}
@Override
public
int
run(String[] as)
throws
Exception {
System.setProperty(
"HADOOP_USER_NAME"
,
"root"
);
JobConf conf =
new
JobConf();
conf.setMapperClass(StationMapper.
class
);
conf.setReducerClass(MultipleOutputReducer.
class
);
conf.setMapOutputKeyClass(Text.
class
);
conf.setOutputKeyClass(NullWritable.
class
);
conf.setOutputFormat(NullOutputFormat.
class
);
FileInputFormat.setInputPaths(conf,
new
Path(
"hdfs://ubuntu:9000/sample1.txt"
));//input路径
FileOutputFormat.setOutputPath(conf,
new
Path(
"hdfs://ubuntu:9000/temperature"
));//output路径
MultipleOutputs.addMultiNamedOutput(conf,
"station"
, TextOutputFormat.
class
, NullWritable.
class
, Text.
class
);
JobClient.runJob(conf);
return
0
;
}
public
static
void
main(String[] args)
throws
Exception{
int
exitCode = ToolRunner.run(
new
PatitionByStationUsingMultipleOutputs(), args);
System.exit(exitCode);
}
}