package
com.mzsx.hadoop;
import
java.io.IOException;
import
java.util.Random;
import
java.util.StringTokenizer;
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.fs.FileSystem;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.io.WritableComparable;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.Mapper;
import
org.apache.hadoop.mapreduce.Reducer;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public
class
MySortWordCount {
public
static
class
MyMapper
extends
Mapper<Object, Text, Text, IntWritable> {
private
final
static
IntWritable one =
new
IntWritable(
1
);
private
Text word =
new
Text();
public
void
map(Object key, Text value, Context context)
throws
IOException, InterruptedException {
System.err.println(key +
","
+ value);
String tmp=value.toString();
tmp=tmp.replace(
'\''
,
' '
);
tmp=tmp.replace(
'.'
,
' '
);
tmp=tmp.replace(
','
,
' '
);
tmp=tmp.replace(
':'
,
' '
);
tmp=tmp.replace(
'!'
,
' '
);
tmp=tmp.replace(
';'
,
' '
);
tmp=tmp.replace(
'?'
,
' '
);
tmp=tmp.replace(
'`'
,
' '
);
tmp=tmp.replace(
'"'
,
' '
);
tmp=tmp.replace(
'&'
,
' '
);
tmp=tmp.replace(
'('
,
' '
);
tmp=tmp.replace(
')'
,
' '
);
tmp=tmp.replace(
'-'
,
' '
);
StringTokenizer itr =
new
StringTokenizer(tmp);
while
(itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
};
}
public
static
class
MyReducer
extends
Reducer<Text, IntWritable, Text, IntWritable> {
private
IntWritable result =
new
IntWritable();
protected
void
reduce(Text key, Iterable<IntWritable> values,
Context context)
throws
IOException, InterruptedException {
System.err.println(key +
","
+ values);
int
sum =
0
;
for
(IntWritable val : values) {
sum += val.get();
}
result.set(sum);
;
context.write(key, result);
};
}
public
static
class
SortMapper
extends
Mapper<Object, Text, IntWritable,Text>{
public
void
map(Object key, Text value, Context context)
throws
IOException, InterruptedException {
IntWritable times =
new
IntWritable(
1
);
Text password =
new
Text();
String eachline=value.toString();
String[] eachterm =eachline.split(
"\t"
);
password.set(eachterm[
0
]);
times.set(Integer.parseInt(eachterm[
1
]));
context.write(times,password);
}
}
public
static
class
SortReducer
extends
Reducer<IntWritable,Text,IntWritable,Text> {
private
Text password =
new
Text();
public
void
reduce(IntWritable key,Iterable<Text> values, Context context)
throws
IOException, InterruptedException {
for
(Text val : values) {
password.set(val);
context.write(key,password);
}
}
}
private
static
class
IntDecreasingComparator
extends
IntWritable.Comparator {
public
int
compare(WritableComparable a, WritableComparable b) {
return
super
.compare(a, b);
}
public
int
compare(
byte
[] b1,
int
s1,
int
l1,
byte
[] b2,
int
s2,
int
l2) {
return
super
.compare(b1, s1, l1, b2, s2, l2);
}
}
public
static
void
main(String[] args)
throws
Exception {
Configuration conf =
new
Configuration();
Job job =
new
Job(conf,
"Word Count"
);
job.setJarByClass(MySortWordCount.
class
);
job.setMapperClass(MyMapper.
class
);
job.setCombinerClass(MyReducer.
class
);
job.setReducerClass(MyReducer.
class
);
job.setOutputKeyClass(Text.
class
);
job.setOutputValueClass(IntWritable.
class
);
FileInputFormat.addInputPath(job,
new
Path(
"/user/root/aoman.txt"
));
Path tempDir =
new
Path(
"MySortWordCount-temp-"
+ Integer.toString(
new
Random().nextInt(Integer.MAX_VALUE)));
FileOutputFormat.setOutputPath(job, tempDir);
if
(job.waitForCompletion(
true
))
{
Job sortJob =
new
Job(conf,
"csdnsort"
);
sortJob.setJarByClass(MySortWordCount.
class
);
FileInputFormat.addInputPath(sortJob, tempDir);
sortJob.setMapperClass(SortMapper.
class
);
FileOutputFormat.setOutputPath(sortJob,
new
Path(
"/user/root/sort1"
));
sortJob.setOutputKeyClass(IntWritable.
class
);
sortJob.setOutputValueClass(Text.
class
);
sortJob.setSortComparatorClass(IntDecreasingComparator.
class
);
FileSystem.get(conf).deleteOnExit(tempDir);
System.exit(sortJob.waitForCompletion(
true
) ?
0
:
1
);
}
System.exit(job.waitForCompletion(
true
) ?
0
:
1
);
}
}