使用老版本的java api提交hadoop作业
还是使用之前的单词计数的例子
自定义Mapper类
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; //自定义的Mapper类必须继承MapReduceBase 并且实现Mapper接口 public class JMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable> { @Override public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> collector, Reporter reporter) throws IOException { String[] ss = value.toString().split("\t"); for (String s : ss) { //使用collector.collect而不是context.write collector.collect(new Text(s), new LongWritable(1)); } } }
自定义Reducer类
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; //自定义的Reducer类必须继承MapReduceBase 并且实现Reducer接口 public class JReducer extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable> { @Override public void reduce(Text key, Iterator<LongWritable> value, OutputCollector<Text, LongWritable> collector, Reporter reporter) throws IOException { long sum = 0; //由于value不在可以用foreach循环,所以用while代替 while (value.hasNext()) { sum += value.next().get(); } collector.collect(key, new LongWritable(sum)); } }
运行提交代码的类JSubmit
import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; public class JSubmit { public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException { Path outPath = new Path("hdfs://localhost:9000/out"); Path inPath = new Path("/home/hadoop/word"); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://localhost:9000"), conf); if (fs.exists(outPath)) { fs.delete(outPath, true); } // 使用JobConf 而不是Job JobConf job = new JobConf(conf, JSubmit.class); FileInputFormat.setInputPaths(job, inPath); job.setInputFormat(TextInputFormat.class); job.setMapperClass(JMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setReducerClass(JReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileOutputFormat.setOutputPath(job, outPath); job.setOutputFormat(TextOutputFormat.class); // 使用JobClient.runJob而不是job.waitForCompletion JobClient.runJob(job); } }
可以看到
其实老版本的api差别不大,只是用了少数几个类替换了而已
注意老版本api的类虽然和新版本api的类名字很多都是一模一样的
但是所在的包不同,老版本所在的包都是mapred的,而新版本的都在mapreduce

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
通过java api提交自定义hadoop 作业
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/43734989 通过API操作之前要先了解几个基本知识 一、hadoop的基本数据类型和java的基本数据类型是不一样的,但是都存在对应的关系 如下图 如果需要定义自己的数据类型,则必须实现Writable hadoop的数据类型可以通过get方法获得对应的java数据类型 而java的数据类型可以通过hadoop数据类名的构造函数,或者set方法转换 二、hadoop提交作业的的步骤分为八个,可以理解为天龙八步 如下: map端工作: 1.1 读取要操作的文件--这步会将文件的内容格式化成键值对的形式,键为每一行的起始位置偏移,值为每一行的内容 1.2 调用map进行处理--在这步使用自定义的Mapper类来实现自己的逻辑,输入的数据为1.1格式化的键值对,输入的数据也是键值对的形式 1.3 对map的处理结果进行分区--map处理完毕之后可以根据自己的业务需求来对键值对进行分区处理,比如,将类型不同的结果保存在不同的文件中等。...
- 下一篇
在hadoop作业中自定义分区和归约
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/43735703 当遇到有特殊的业务需求时,需要对hadoop的作业进行分区处理 那么我们可以通过自定义的分区类来实现 还是通过单词计数的例子,JMapper和JReducer的代码不变,只是在JSubmit中改变了设置默认分区的代码,见代码: //1.3分区 //设置自定义分区类 job.setPartitionerClass(JPartitioner.class); //设置分区个数--这里设置成2,代表输出分为2个区,由两个reducer输出 job.setNumReduceTasks(2); 自定义的JPartitioner代码如下: import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; //自定义...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8编译安装MySQL8.0.19
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Red5直播服务器,属于Java语言的直播服务器
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- CentOS6,CentOS7官方镜像安装Oracle11G
- MySQL8.0.19开启GTID主从同步CentOS8
- Docker安装Oracle12C,快速搭建Oracle学习环境
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Hadoop3单机部署,实现最简伪集群
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作