您现在的位置是:首页 > 文章详情

Hadoop: MapReduce2多个job串行处理

日期:2015-05-29点击:493

复杂的MapReduce处理中,往往需要将复杂的处理过程,分解成多个简单的Job来执行,第1个Job的输出做为第2个Job的输入,相互之间有一定依赖关系。以上一篇中的求平均数为例,可以分解成三个步骤:

1. 求Sum

2. 求Count

3. 计算平均数

每1个步骤看成一个Job,其中Job3必须等待Job1、Job2完成,并将Job1、Job2的输出结果做为输入,下面的代码演示了如何将这3个Job串起来

 1 package yjmyzz.mr.job.link;  2  3 import org.apache.hadoop.conf.Configuration;  4 import org.apache.hadoop.fs.Path;  5 import org.apache.hadoop.io.DoubleWritable;  6 import org.apache.hadoop.io.LongWritable;  7 import org.apache.hadoop.io.Text;  8 import org.apache.hadoop.mapreduce.Job;  9 import org.apache.hadoop.mapreduce.Mapper;  10 import org.apache.hadoop.mapreduce.Reducer;  11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  13 import yjmyzz.util.HDFSUtil;  14  15 import java.io.IOException;  16  17  18 public class Avg2 {  19  20 private static final Text TEXT_SUM = new Text("SUM");  21 private static final Text TEXT_COUNT = new Text("COUNT");  22 private static final Text TEXT_AVG = new Text("AVG");  23  24 //计算Sum  25 public static class SumMapper  26 extends Mapper<LongWritable, Text, Text, LongWritable> {  27  28 public long sum = 0;  29  30 public void map(LongWritable key, Text value, Context context)  31 throws IOException, InterruptedException {  32 sum += Long.parseLong(value.toString());  33  }  34  35 protected void cleanup(Context context) throws IOException, InterruptedException {  36 context.write(TEXT_SUM, new LongWritable(sum));  37  }  38  39  }  40  41 public static class SumReducer extends Reducer<Text, LongWritable, Text, LongWritable> {  42  43 public long sum = 0;  44  45 public void reduce(Text key, Iterable<LongWritable> values, Context context)  46 throws IOException, InterruptedException {  47 for (LongWritable v : values) {  48 sum += v.get();  49  }  50 context.write(TEXT_SUM, new LongWritable(sum));  51  }  52  53  }  54  55 //计算Count  56 public static class CountMapper  57 extends Mapper<LongWritable, Text, Text, LongWritable> {  58  59 public long count = 0;  60  61 public void map(LongWritable key, Text value, Context context)  62 throws IOException, InterruptedException {  63 count += 1;  64  }  65  66 protected void cleanup(Context context) throws IOException, InterruptedException {  67 context.write(TEXT_COUNT, new LongWritable(count));  68  }  69  70  }  71  72 public static class CountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {  73  74 public long count = 0;  75  76 public void reduce(Text key, Iterable<LongWritable> values, Context context)  77 throws IOException, InterruptedException {  78 for (LongWritable v : values) {  79 count += v.get();  80  }  81 context.write(TEXT_COUNT, new LongWritable(count));  82  }  83  84  }  85  86 //计算Avg  87 public static class AvgMapper  88 extends Mapper<LongWritable, Text, LongWritable, LongWritable> {  89  90 public long count = 0;  91 public long sum = 0;  92  93 public void map(LongWritable key, Text value, Context context)  94 throws IOException, InterruptedException {  95 String[] v = value.toString().split("\t");  96 if (v[0].equals("COUNT")) {  97 count = Long.parseLong(v[1]);  98 } else if (v[0].equals("SUM")) {  99 sum = Long.parseLong(v[1]); 100  } 101  } 102 103 protected void cleanup(Context context) throws IOException, InterruptedException { 104 context.write(new LongWritable(sum), new LongWritable(count)); 105  } 106 107  } 108 109 110 public static class AvgReducer extends Reducer<LongWritable, LongWritable, Text, DoubleWritable> { 111 112 public long sum = 0; 113 public long count = 0; 114 115 public void reduce(LongWritable key, Iterable<LongWritable> values, Context context) 116 throws IOException, InterruptedException { 117 sum += key.get(); 118 for (LongWritable v : values) { 119 count += v.get(); 120  } 121  } 122 123 protected void cleanup(Context context) throws IOException, InterruptedException { 124 context.write(TEXT_AVG, new DoubleWritable(new Double(sum) / count)); 125  } 126 127  } 128 129 130 public static void main(String[] args) throws Exception { 131 132 Configuration conf = new Configuration(); 133 134 String inputPath = "/input/duplicate.txt"; 135 String maxOutputPath = "/output/max/"; 136 String countOutputPath = "/output/count/"; 137 String avgOutputPath = "/output/avg/"; 138 139 //删除输出目录(可选,省得多次运行时,总是报OUTPUT目录已存在) 140  HDFSUtil.deleteFile(conf, maxOutputPath); 141  HDFSUtil.deleteFile(conf, countOutputPath); 142  HDFSUtil.deleteFile(conf, avgOutputPath); 143 144 Job job1 = Job.getInstance(conf, "Sum"); 145 job1.setJarByClass(Avg2.class); 146 job1.setMapperClass(SumMapper.class); 147 job1.setCombinerClass(SumReducer.class); 148 job1.setReducerClass(SumReducer.class); 149 job1.setOutputKeyClass(Text.class); 150 job1.setOutputValueClass(LongWritable.class); 151 FileInputFormat.addInputPath(job1, new Path(inputPath)); 152 FileOutputFormat.setOutputPath(job1, new Path(maxOutputPath)); 153 154 155 Job job2 = Job.getInstance(conf, "Count"); 156 job2.setJarByClass(Avg2.class); 157 job2.setMapperClass(CountMapper.class); 158 job2.setCombinerClass(CountReducer.class); 159 job2.setReducerClass(CountReducer.class); 160 job2.setOutputKeyClass(Text.class); 161 job2.setOutputValueClass(LongWritable.class); 162 FileInputFormat.addInputPath(job2, new Path(inputPath)); 163 FileOutputFormat.setOutputPath(job2, new Path(countOutputPath)); 164 165 166 Job job3 = Job.getInstance(conf, "Average"); 167 job3.setJarByClass(Avg2.class); 168 job3.setMapperClass(AvgMapper.class); 169 job3.setReducerClass(AvgReducer.class); 170 job3.setMapOutputKeyClass(LongWritable.class); 171 job3.setMapOutputValueClass(LongWritable.class); 172 job3.setOutputKeyClass(Text.class); 173 job3.setOutputValueClass(DoubleWritable.class); 174 175 //将job1及job2的输出为做job3的输入 176 FileInputFormat.addInputPath(job3, new Path(maxOutputPath)); 177 FileInputFormat.addInputPath(job3, new Path(countOutputPath)); 178 FileOutputFormat.setOutputPath(job3, new Path(avgOutputPath)); 179 180 //提交job1及job2,并等待完成 181 if (job1.waitForCompletion(true) && job2.waitForCompletion(true)) { 182 System.exit(job3.waitForCompletion(true) ? 0 : 1); 183  } 184 185  } 186 187 188 }

输入文本在上一篇可以找到,上面这段代码的主要思路:

1. Sum和Count均采用相同的输入/input/duplicate.txt,然后将各自的处理结果分别输出到/output/max/及/output/count/下

2. Avg从/output/max及/output/count获取结果做为输入,然后根据Key值不同,拿到sum和count的值,最终计算并输出到/output/avg/下

原文链接:https://yq.aliyun.com/articles/250805
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章