package
whut;
import
java.io.IOException;
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.conf.Configured;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.Mapper;
import
org.apache.hadoop.mapreduce.Reducer;
import
org.apache.hadoop.mapreduce.Mapper.Context;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import
org.apache.hadoop.util.Tool;
import
org.apache.hadoop.util.ToolRunner;
public
class
SortMain
extends
Configured
implements
Tool{
public
static
class
GroupMapper
extends
Mapper<Text, Text, TextInt, IntWritable> {
public
IntWritable second=
new
IntWritable();
public
TextInt tx=
new
TextInt();
@Override
protected
void
map(Text key, Text value, Context context)
throws
IOException, InterruptedException {
String lineKey=key.toString();
String lineValue=value.toString();
int
lineInt=Integer.parseInt(lineValue);
tx.setFirstKey(lineKey);
tx.setSecondKey(lineInt);
second.set(lineInt);
context.write(tx, second);
}
}
public
static
class
GroupReduce
extends
Reducer<TextInt, IntWritable, Text, Text>
{
@Override
protected
void
reduce(TextInt key, Iterable<IntWritable> values,
Context context)
throws
IOException, InterruptedException {
StringBuffer sb=
new
StringBuffer();
for
(IntWritable val:values)
{
sb.append(val+
","
);
}
if
(sb.length()>
0
)
{
sb.deleteCharAt(sb.length()-
1
);
}
context.write(
new
Text(key.getFirstKey()),
new
Text(sb.toString()));
}
}
@Override
public
int
run(String[] args)
throws
Exception {
Configuration conf=getConf();
Job job=
new
Job(conf,
"SecondarySort"
);
job.setJarByClass(SortMain.
class
);
FileInputFormat.addInputPath(job,
new
Path(args[
0
]));
FileOutputFormat.setOutputPath(job,
new
Path(args[
1
]));
job.setMapperClass(GroupMapper.
class
);
job.setReducerClass(GroupReduce.
class
);
job.setPartitionerClass(KeyPartitioner.
class
);
job.setGroupingComparatorClass(TextComparator.
class
);
/*************关键点**********/
job.setSortComparatorClass(TextIntComparator.
class
);
job.setInputFormatClass(KeyValueTextInputFormat.
class
);
job.setMapOutputKeyClass(TextInt.
class
);
job.setMapOutputValueClass(IntWritable.
class
);
job.setOutputKeyClass(Text.
class
);
job.setOutputValueClass(Text.
class
);
job.waitForCompletion(
true
);
int
exitCode=job.isSuccessful()?
0
:
1
;
return
exitCode;
}
public
static
void
main(String[] args)
throws
Exception
{
int
exitCode=ToolRunner.run(
new
SortMain(), args);
System.exit(exitCode);
}
}