hello tom
hello jerry
hello tom
hello jerry
hello jerry
tom jerry
hello a.txt->3 b.txt->2 c.txt->2
jerry b.txt->3 a.txt->1 c.txt->1
tom a.txt->2 b.txt->1 c.txt->1
1 import java.io.IOException;
2
3 import org.apache.hadoop.conf.Configuration;
4 import org.apache.hadoop.fs.Path;
5 import org.apache.hadoop.io.LongWritable;
6 import org.apache.hadoop.io.Text;
7 import org.apache.hadoop.mapreduce.Job;
8 import org.apache.hadoop.mapreduce.Mapper;
9 import org.apache.hadoop.mapreduce.Reducer;
10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
11 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
13
14 public class InverseIndex {
15
16 public static class IndexMapper extends Mapper<LongWritable, Text, Text, Text>{
17 private Text k = new Text();
18 private Text v = new Text();
19 @Override
20 protected void map(LongWritable key, Text value,Mapper<LongWritable, Text, Text, Text>.Context context)
21 throws IOException, InterruptedException {
22 String line = value.toString();
23 String [] words = line.split(" ");
24 FileSplit inputSplit = (FileSplit)context.getInputSplit();//返回mapper读取的是哪个切片split
25 //path=hdfs://itcast:9000/ii/a.txt
26 //k2,v2 为 hello->a.txt {1,1,1}
27 String path = inputSplit.getPath().toString();
28 for (String word : words) {
29 k.set(word + "->" + path);
30 v.set("1");
31 context.write(k, v);
32 }
33 }
34 }
35
36 public static class IndexCombiner extends Reducer<Text, Text, Text, Text>{
37 private Text k = new Text();
38 private Text v = new Text();
39 @Override
40 protected void reduce(Text key, Iterable<Text> values,Reducer<Text, Text, Text, Text>.Context context)
41 throws IOException, InterruptedException {
42 //k2,v2 为hello->a.txt {1,1,1} -----> k3,v3为 hello,a.txt->3
43 int counter = 0;
44 for(Text text :values){
45 counter += Integer.parseInt(text.toString());
46 }
47 String[] wordAndPath = key.toString().split("->");
48 String word = wordAndPath[0];
49 String path = wordAndPath[1];
50 k.set(word);
51 v.set(path+"->"+counter);
52 context.write(k,v);
53 }
54 }
55
56
57 public static class IndexReducer extends Reducer<Text, Text, Text, Text>{
58 private Text v = new Text();
59 @Override
60 protected void reduce(Text key, Iterable<Text> values,Reducer<Text, Text, Text, Text>.Context context)
61 throws IOException, InterruptedException {
62 //Reducer这里 是把所有key相同的搞到一块了,这个地方对应的values为Iterable也证实这一点.
63 //不同的Map根据k2 到达Reducer 把k2相同的汇聚到一起...对应的k2对应的v2组成一个集合.
64 //从combiner过来的k和v为 hello,a.txt->3 经过reducer变成
65 String result = "";
66 for(Text t:values){
67 result += t.toString() + "\t";
68 }
69 v.set(result);
70 context.write(key,v);
71 }
72 }
73
74 public static void main(String[] args) throws Exception {
75 Configuration conf = new Configuration();
76 Job job = Job.getInstance(conf);
77 job.setJarByClass(InverseIndex.class);
78
79 job.setMapperClass(IndexMapper.class);
80 job.setMapOutputKeyClass(Text.class);
81 job.setMapOutputValueClass(Text.class);
82
83 job.setCombinerClass(IndexCombiner.class);
84 FileInputFormat.setInputPaths(job, new Path(args[0]));
85
86 job.setReducerClass(IndexReducer.class);
87 job.setOutputKeyClass(Text.class);
88 job.setOutputValueClass(Text.class);
89
90 FileOutputFormat.setOutputPath(job, new Path(args[1]));
91 System.exit(job.waitForCompletion(true) ? 0 : 1);//0是正常推出以 1是异常退出.
92 }
93 }
hadoop jar /root/itcastmr.jar itcastmr.inverseindex.InverseIndex /user/root/InverseIndex /InverseIndexResult