Hadoop: MapReduce2多个job串行处理
复杂的MapReduce处理中,往往需要将复杂的处理过程,分解成多个简单的Job来执行,第1个Job的输出做为第2个Job的输入,相互之间有一定依赖关系。以上一篇中的求平均数为例,可以分解成三个步骤:
1. 求Sum
2. 求Count
3. 计算平均数
每1个步骤看成一个Job,其中Job3必须等待Job1、Job2完成,并将Job1、Job2的输出结果做为输入,下面的代码演示了如何将这3个Job串起来
1 package yjmyzz.mr.job.link; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.DoubleWritable; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Job; 9 import org.apache.hadoop.mapreduce.Mapper; 10 import org.apache.hadoop.mapreduce.Reducer; 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 13 import yjmyzz.util.HDFSUtil; 14 15 import java.io.IOException; 16 17 18 public class Avg2 { 19 20 private static final Text TEXT_SUM = new Text("SUM"); 21 private static final Text TEXT_COUNT = new Text("COUNT"); 22 private static final Text TEXT_AVG = new Text("AVG"); 23 24 //计算Sum 25 public static class SumMapper 26 extends Mapper<LongWritable, Text, Text, LongWritable> { 27 28 public long sum = 0; 29 30 public void map(LongWritable key, Text value, Context context) 31 throws IOException, InterruptedException { 32 sum += Long.parseLong(value.toString()); 33 } 34 35 protected void cleanup(Context context) throws IOException, InterruptedException { 36 context.write(TEXT_SUM, new LongWritable(sum)); 37 } 38 39 } 40 41 public static class SumReducer extends Reducer<Text, LongWritable, Text, LongWritable> { 42 43 public long sum = 0; 44 45 public void reduce(Text key, Iterable<LongWritable> values, Context context) 46 throws IOException, InterruptedException { 47 for (LongWritable v : values) { 48 sum += v.get(); 49 } 50 context.write(TEXT_SUM, new LongWritable(sum)); 51 } 52 53 } 54 55 //计算Count 56 public static class CountMapper 57 extends Mapper<LongWritable, Text, Text, LongWritable> { 58 59 public long count = 0; 60 61 public void map(LongWritable key, Text value, Context context) 62 throws IOException, InterruptedException { 63 count += 1; 64 } 65 66 protected void cleanup(Context context) throws IOException, InterruptedException { 67 context.write(TEXT_COUNT, new LongWritable(count)); 68 } 69 70 } 71 72 public static class CountReducer extends Reducer<Text, LongWritable, Text, LongWritable> { 73 74 public long count = 0; 75 76 public void reduce(Text key, Iterable<LongWritable> values, Context context) 77 throws IOException, InterruptedException { 78 for (LongWritable v : values) { 79 count += v.get(); 80 } 81 context.write(TEXT_COUNT, new LongWritable(count)); 82 } 83 84 } 85 86 //计算Avg 87 public static class AvgMapper 88 extends Mapper<LongWritable, Text, LongWritable, LongWritable> { 89 90 public long count = 0; 91 public long sum = 0; 92 93 public void map(LongWritable key, Text value, Context context) 94 throws IOException, InterruptedException { 95 String[] v = value.toString().split("\t"); 96 if (v[0].equals("COUNT")) { 97 count = Long.parseLong(v[1]); 98 } else if (v[0].equals("SUM")) { 99 sum = Long.parseLong(v[1]); 100 } 101 } 102 103 protected void cleanup(Context context) throws IOException, InterruptedException { 104 context.write(new LongWritable(sum), new LongWritable(count)); 105 } 106 107 } 108 109 110 public static class AvgReducer extends Reducer<LongWritable, LongWritable, Text, DoubleWritable> { 111 112 public long sum = 0; 113 public long count = 0; 114 115 public void reduce(LongWritable key, Iterable<LongWritable> values, Context context) 116 throws IOException, InterruptedException { 117 sum += key.get(); 118 for (LongWritable v : values) { 119 count += v.get(); 120 } 121 } 122 123 protected void cleanup(Context context) throws IOException, InterruptedException { 124 context.write(TEXT_AVG, new DoubleWritable(new Double(sum) / count)); 125 } 126 127 } 128 129 130 public static void main(String[] args) throws Exception { 131 132 Configuration conf = new Configuration(); 133 134 String inputPath = "/input/duplicate.txt"; 135 String maxOutputPath = "/output/max/"; 136 String countOutputPath = "/output/count/"; 137 String avgOutputPath = "/output/avg/"; 138 139 //删除输出目录(可选,省得多次运行时,总是报OUTPUT目录已存在) 140 HDFSUtil.deleteFile(conf, maxOutputPath); 141 HDFSUtil.deleteFile(conf, countOutputPath); 142 HDFSUtil.deleteFile(conf, avgOutputPath); 143 144 Job job1 = Job.getInstance(conf, "Sum"); 145 job1.setJarByClass(Avg2.class); 146 job1.setMapperClass(SumMapper.class); 147 job1.setCombinerClass(SumReducer.class); 148 job1.setReducerClass(SumReducer.class); 149 job1.setOutputKeyClass(Text.class); 150 job1.setOutputValueClass(LongWritable.class); 151 FileInputFormat.addInputPath(job1, new Path(inputPath)); 152 FileOutputFormat.setOutputPath(job1, new Path(maxOutputPath)); 153 154 155 Job job2 = Job.getInstance(conf, "Count"); 156 job2.setJarByClass(Avg2.class); 157 job2.setMapperClass(CountMapper.class); 158 job2.setCombinerClass(CountReducer.class); 159 job2.setReducerClass(CountReducer.class); 160 job2.setOutputKeyClass(Text.class); 161 job2.setOutputValueClass(LongWritable.class); 162 FileInputFormat.addInputPath(job2, new Path(inputPath)); 163 FileOutputFormat.setOutputPath(job2, new Path(countOutputPath)); 164 165 166 Job job3 = Job.getInstance(conf, "Average"); 167 job3.setJarByClass(Avg2.class); 168 job3.setMapperClass(AvgMapper.class); 169 job3.setReducerClass(AvgReducer.class); 170 job3.setMapOutputKeyClass(LongWritable.class); 171 job3.setMapOutputValueClass(LongWritable.class); 172 job3.setOutputKeyClass(Text.class); 173 job3.setOutputValueClass(DoubleWritable.class); 174 175 //将job1及job2的输出为做job3的输入 176 FileInputFormat.addInputPath(job3, new Path(maxOutputPath)); 177 FileInputFormat.addInputPath(job3, new Path(countOutputPath)); 178 FileOutputFormat.setOutputPath(job3, new Path(avgOutputPath)); 179 180 //提交job1及job2,并等待完成 181 if (job1.waitForCompletion(true) && job2.waitForCompletion(true)) { 182 System.exit(job3.waitForCompletion(true) ? 0 : 1); 183 } 184 185 } 186 187 188 }
输入文本在上一篇可以找到,上面这段代码的主要思路:
1. Sum和Count均采用相同的输入/input/duplicate.txt,然后将各自的处理结果分别输出到/output/max/及/output/count/下
2. Avg从/output/max及/output/count获取结果做为输入,然后根据Key值不同,拿到sum和count的值,最终计算并输出到/output/avg/下

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Hadoop: MapReduce2的几个基本示例
1) WordCount 这个就不多说了,满大街都是,网上有几篇对WordCount的详细分析 http://www.sxt.cn/u/235/blog/5809 http://www.cnblogs.com/zhanghuijunjava/archive/2013/04/27/3036549.html 这二篇都写得不错, 特别几张图画得很清晰 2) 去重处理(Distinct) 类似于db中的select distinct(x) from table , 去重处理甚至比WordCount还要简单,假如我们要对以下文件的内容做去重处理(注:该文件也是后面几个示例的输入参数) 2 8 8 3 2 3 5 3 0 2 7 基本上啥也不用做,在map阶段,把每一行的值当成key分发下去,然后在reduce阶段回收上来就可以了. 注:里面用到了一个自己写的类HDFSUtil,可以在hadoop: hdfs API示例一文中找到. 原理:map阶段完成后,在reduce开始之前,会有一个combine的过程,相同的key值会自动合并,所以自然而然的就去掉了重复. 1 package yjmyz...
- 下一篇
Hadoop:pig 安装及入门示例
pig是hadoop的一个子项目,用于简化MapReduce的开发工作,可以用更人性化的脚本方式分析数据。 一、安装 a) 下载 从官网http://pig.apache.org下载最新版本(目前是0.14.0版本),最新版本可以兼容hadop 0.x /1.x / 2.x版本,直接解压到某个目录即可。 注:下面是几个国内的镜像站点 http://mirrors.cnnic.cn/apache/pig/ http://mirror.bit.edu.cn/apache/pig/ http://mirrors.hust.edu.cn/apache/pig/ 本文的解压目录是:/Users/jimmy/app/pig-0.14.0 b) 环境变量 export PIG_HOME=/Users/jimmy/app/pig-0.14.0 export HADOOP_HOME=/Users/jimmy/app/hadoop-2.6.0 export PIG_CLASSPATH=${HADOOP_HOME}/etc/hadoop/ export HADOOP_CONF_DIR=${HADOOP_HO...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- Windows10,CentOS7,CentOS8安装Nodejs环境
- MySQL8.0.19开启GTID主从同步CentOS8
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- 2048小游戏-低调大师作品
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS7安装Docker,走上虚拟化容器引擎之路