您现在的位置是:首页 > 文章详情

使用老版本的java api提交hadoop作业

日期:2015-02-10点击:507
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/43735491

还是使用之前的单词计数的例子


自定义Mapper类

import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; //自定义的Mapper类必须继承MapReduceBase 并且实现Mapper接口 public class JMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable> { @Override public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> collector, Reporter reporter) throws IOException { String[] ss = value.toString().split("\t"); for (String s : ss) { //使用collector.collect而不是context.write collector.collect(new Text(s), new LongWritable(1)); } } } 

自定义Reducer类

import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; //自定义的Reducer类必须继承MapReduceBase 并且实现Reducer接口 public class JReducer extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable> { @Override public void reduce(Text key, Iterator<LongWritable> value, OutputCollector<Text, LongWritable> collector, Reporter reporter) throws IOException { long sum = 0; //由于value不在可以用foreach循环,所以用while代替 while (value.hasNext()) { sum += value.next().get(); } collector.collect(key, new LongWritable(sum)); } } 

运行提交代码的类JSubmit

import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; public class JSubmit { public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException { Path outPath = new Path("hdfs://localhost:9000/out"); Path inPath = new Path("/home/hadoop/word"); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://localhost:9000"), conf); if (fs.exists(outPath)) { fs.delete(outPath, true); } // 使用JobConf 而不是Job JobConf job = new JobConf(conf, JSubmit.class); FileInputFormat.setInputPaths(job, inPath); job.setInputFormat(TextInputFormat.class); job.setMapperClass(JMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setReducerClass(JReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileOutputFormat.setOutputPath(job, outPath); job.setOutputFormat(TextOutputFormat.class); // 使用JobClient.runJob而不是job.waitForCompletion JobClient.runJob(job); } } 

可以看到

其实老版本的api差别不大,只是用了少数几个类替换了而已

注意老版本api的类虽然和新版本api的类名字很多都是一模一样的

但是所在的包不同,老版本所在的包都是mapred的,而新版本的都在mapreduce




原文链接:https://yq.aliyun.com/articles/667836
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章