按照k2排序,要求k2必须是可以比较的,即必须实现WritableComparable接口。
但是如果还想让别的字段(比如v2中的一些字段)参与排序怎么办?
需要重新定义k2....把需要参与排序的字段都放到k2中.
这块用代码实现:
假如数据现在的结构是
3 3
3 2
3 1
2 2
2 1
1 1
看代码:
1 import java.io.DataInput;
2 import java.io.DataOutput;
3 import java.io.IOException;
4
5 import org.apache.hadoop.conf.Configuration;
6 import org.apache.hadoop.fs.Path;
7 import org.apache.hadoop.io.LongWritable;
8 import org.apache.hadoop.io.NullWritable;
9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.io.WritableComparable;
11 import org.apache.hadoop.mapreduce.Job;
12 import org.apache.hadoop.mapreduce.Mapper;
13 import org.apache.hadoop.mapreduce.Reducer;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
16
17 public class TwoIntSortApp {
18
19 public static void main(String[] args) throws Exception {
20 Job job = Job.getInstance(new Configuration(), TwoIntSortApp.class.getSimpleName());
21 job.setJarByClass(TwoIntSortApp.class);
22 FileInputFormat.setInputPaths(job, args[0]);
23
24 job.setMapperClass(TwoIntSortMapper.class);
25 job.setMapOutputKeyClass(TwoInt.class);
26 job.setMapOutputValueClass(NullWritable.class);
27
28 job.setReducerClass(TwoIntSortReducer.class);
29 job.setOutputKeyClass(TwoInt.class);
30 job.setOutputValueClass(NullWritable.class);
31
32 FileOutputFormat.setOutputPath(job, new Path(args[1]));
33 job.waitForCompletion(true);
34 }
35
36 public static class TwoIntSortMapper extends Mapper<LongWritable, Text, TwoInt, NullWritable>{
37 TwoInt k2 = new TwoInt();
38 @Override
39 protected void map(LongWritable key, Text value,
40 Mapper<LongWritable, Text, TwoInt, NullWritable>.Context context)
41 throws IOException, InterruptedException {
42 String[] splited = value.toString().split("\t");
43 k2.set(splited[0],splited[1]);
44 context.write(k2, NullWritable.get());
45 System.out.println("Mapper-----第一个数:"+k2.first+" 第二个数:"+k2.second);
46 }
47 }
48
49 public static class TwoIntSortReducer extends Reducer<TwoInt, NullWritable, TwoInt, NullWritable>{
50 int i=1;
51 @Override
52 protected void reduce(TwoInt k2, Iterable<NullWritable> arg1,
53 Reducer<TwoInt, NullWritable, TwoInt, NullWritable>.Context context)
54 throws IOException, InterruptedException {
55 context.write(k2,NullWritable.get());
56 System.out.println("调用次数"+(i++));
57 System.out.println("Reducer-----第一个数:"+k2.first+" 第二个数:"+k2.second);
58 }
59 }
60
61 public static class TwoInt implements WritableComparable<TwoInt>{
62 int first;
63 int second;
64 public void write(DataOutput out) throws IOException {
65 out.writeInt(first);
66 out.writeInt(second);
67 }
68
69 public void set(String s1,String s2){
70 this.first = Integer.parseInt(s1);
71 this.second = Integer.parseInt(s2);
72 }
73
74 public void readFields(DataInput in) throws IOException {
75 this.first = in.readInt();
76 this.second = in.readInt();
77
78 }
79
80 public int compareTo(TwoInt o) {
81 int r1 = this.first - o.first;
82 if(r1 < 0){
83 return -1;
84 }else if(r1 > 0){
85 return 1;
86 }
87 int r2 = this.second - o.second;
88 return (r2 < 0 ? -1 : (r2 > 0 ? 1 : 0));
89 }
90
91 @Override
92 public String toString() {
93 return this.first+"\t"+this.second;
94 }
95 }
96 }
//==============================================================
在job上设置Combiner类...
job.setCombinerClass(TwoIntSortReducer.class);//设置Combiner类
job.setGroupingComparatorClass(MyGroupingCompartor.class);//设置自定义的分组类
1 public static class MyGroupingCompartor extends WritableComparator{
2 @Override
3 public int compare(WritableComparable a, WritableComparable b) {
4 TwoInt aa = (TwoInt)a;
5 TwoInt bb = (TwoInt)b;
6 return aa.first-bb.first<0?-1:(aa.first-bb.first>0?1:0);//只要是第一列相同的就认为是一个分组.
7 /*
8 * 1 1
9 * 2 1
10 * 2 2
11 * 3 1
12 * 3 2
13 * 3 3
14 * 这样就分成了三组
15 */
16 }
17 }
本文转自SummerChill博客园博客,原文链接:http://www.cnblogs.com/DreamDrive/p/5678175.html,如需转载请自行联系原作者